Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SMQ - 2546 - Add telemetry aggregation for clients telemetry #1

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

felixgateru
Copy link
Owner

@felixgateru felixgateru commented Jan 25, 2025

What type of PR is this?

What does this do?

Which issue(s) does this PR fix/relate to?

  • Related Issue #
  • Resolves #

Have you included tests for your changes?

Did you document any new/modified feature?

Notes

Summary by Sourcery

New Features:

  • Added the ability to aggregate telemetry data for clients, including inbound and outbound message counts, subscriptions, and connection status.

Summary by CodeRabbit

Here are the updated release notes based on the comprehensive changes:

  • New Features

    • Added event store middleware for messaging services.
    • Enhanced client telemetry tracking with improved subscription and message count management.
    • Introduced new event handling capabilities across adapters (MQTT, WebSocket, CoAP).
    • Added new ClientID field in subscriber configurations for better identification.
  • Improvements

    • Refined client authentication and authorization processes.
    • Updated database schema for more granular telemetry tracking.
    • Improved error handling across multiple services.
  • Infrastructure

    • Updated Docker Compose configuration.
    • Added new services for domains management.
    • Updated dependency versions.
  • Breaking Changes

    • Modified client subscription and telemetry data models.
    • Adjusted adapter interfaces for event handling.

Copy link

sourcery-ai bot commented Jan 25, 2025

Reviewer's Guide by Sourcery

This pull request introduces telemetry aggregation for clients, tracking client connections, subscriptions, and message counts. It modifies the journal service to handle telemetry events based on journal operations, and updates the postgres repository to persist and aggregate this data. Additionally, it adds event publishing to the ws, http and mqtt handlers.

Sequence diagram for client telemetry aggregation flow

sequenceDiagram
    participant Client
    participant Adapter
    participant EventStore
    participant Journal
    participant DB

    Client->>Adapter: Connect/Subscribe/Publish
    Adapter->>EventStore: Publish event
    EventStore->>Journal: Save event
    Journal->>Journal: handleTelemetry()
    alt client.create
        Journal->>DB: SaveClientTelemetry
    else client.subscribe
        Journal->>DB: AddSubscription
    else client.publish
        Journal->>DB: IncrementInboundMessages
        Journal->>DB: IncrementOutboundMessages
    end
Loading

Entity relationship diagram for client telemetry

erDiagram
    clients_telemetry {
        string client_id PK
        string domain_id PK
        text[] subscriptions
        bigint inbound_messages
        bigint outbound_messages
        timestamp first_seen
        timestamp last_seen
    }

    journals {
        string id PK
        string operation
        jsonb attributes
    }

    journals ||--o{ clients_telemetry : "updates"
Loading

File-Level Changes

Change Details Files
Adds constants for all client operations.
  • Introduces constants for client create, remove, subscribe, unsubscribe, publish and disconnect operations for coap, http, mqtt and ws.
journal/service.go
Modifies the journal service to handle telemetry events.
  • Adds a handleTelemetry method to process journal entries and trigger telemetry updates.
  • Adds methods to handle client creation, removal, subscription, unsubscription, and message count updates.
  • Adds methods to convert journal entries to client and adapter events.
journal/service.go
Updates the postgres repository to persist and aggregate telemetry data.
  • Adds methods to save, add, remove, and increment client telemetry data.
  • Adds methods to increment inbound and outbound message counts.
  • Adds methods to remove subscriptions based on connection ID.
journal/postgres/telemetry.go
Adds event publishing to the ws handler.
  • Adds an event store to the ws handler.
  • Publishes events on subscribe, unsubscribe and publish.
ws/handler.go
Adds event publishing to the mqtt handler.
  • Adds an event store to the mqtt handler.
  • Publishes events on connect, disconnect, subscribe, unsubscribe and publish.
mqtt/handler.go
mqtt/events/streams.go
mqtt/events/events.go
mqtt/mocks/events.go
Adds event publishing to the http handler.
  • Adds an event store to the http handler.
  • Publishes events on connect and publish.
http/handler.go
http/events/streams.go
http/events/events.go
http/mocks/events.go
Adds event publishing to the coap adapter.
  • Adds an event store middleware to the coap adapter.
  • Publishes events on subscribe, unsubscribe and publish.
coap/adapter.go
coap/api/transport.go
coap/events/stream.go
coap/events/events.go
coap/events/doc.go

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

coderabbitai bot commented Jan 25, 2025

Walkthrough

The pull request introduces comprehensive changes across multiple components of the SuperMQ messaging system, focusing on enhancing telemetry, event handling, and subscription management. Key modifications include adding event store middleware, refactoring client telemetry tracking, introducing new event structures, and updating database schemas. The changes span adapter services (MQTT, CoAP, WebSocket), journal management, and messaging infrastructure, with a particular emphasis on more granular tracking of client interactions, subscriptions, and message flows.

Changes

File/Package Change Summary
cmd/**/main.go Added event store URL configuration and middleware initialization for adapters.
journal/journal.go Updated client telemetry structure, added new repository methods for subscription and message tracking.
journal/postgres/init.go Modified database schema, introduced new subscriptions table.
mqtt/events/events.go Replaced mqttEvent with connectEvent and subscribeEvent structures.
pkg/messaging/events/* New package for event handling middleware and publisher implementations.
pkg/messaging/pubsub.go Added ClientID field to SubscriberConfig.
journal/api/responses.go Introduced new clientTelemetryRes struct for API responses.
journal/middleware/authorization.go Updated RetrieveClientTelemetry method to change user identifier used for authorization.
mqtt/handler.go Enhanced error handling and refactored connection/subscription logic.
mqtt/handler_test.go Updated test mocks to reflect changes in method signatures.
mqtt/mocks/events.go Removed mock implementation for the EventStore type.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Adapter
    participant EventStore
    participant Repository

    Client->>Adapter: Connect/Subscribe
    Adapter->>EventStore: Publish Connection/Subscription Event
    EventStore-->>Repository: Record Telemetry
    Adapter->>Repository: Save Client Details
    Repository-->>Adapter: Confirmation
    Adapter-->>Client: Connection/Subscription Established
Loading

Poem

🐰 In the realm of messages bright,
Where bytes dance with pure delight,
Our telemetry now stands tall,
Tracking clients, big and small!
SuperMQ hops with coding might! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @felixgateru - I've reviewed your changes and found some issues that need to be addressed.

Blocking issues:

  • The createdAt timestamp parsing logic is incorrect and could cause panics (link)

Overall Comments:

  • There appears to be a bug in the toClientEvent function where the createdAt timestamp is only parsed if the string is empty (if createdAtStr == ""). This should be reversed to parse when the string is not empty.
Here's what I looked at during the review
  • 🔴 General issues: 1 blocking issue, 1 other issue
  • 🟢 Security: all looks good
  • 🟢 Testing: all looks good
  • 🟢 Complexity: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 185 to 236
createdAtStr := journal.Attributes["created_at"].(string)
if createdAtStr == "" {
createdAt, err = time.Parse(time.RFC3339, createdAtStr)
if err != nil {
return clientEvent{}, fmt.Errorf("invalid created_at format")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): The createdAt timestamp parsing logic is incorrect and could cause panics

The condition is backwards - it's trying to parse only when createdAtStr is empty. This should be reversed to parse only when createdAtStr is non-empty. Also, consider adding error handling for the type assertion.


func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
q := `
WITH matched_clients AS (
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): The LIKE query with wildcards on both ends could have performance implications

While there is a GIN index on subscriptions, using LIKE with wildcards on both ends ('%' || $1 || ':' || $2 || '%') could still be slow on large datasets. Consider if the subscription format could be changed to enable more efficient querying.

Suggested implementation:

			WHERE 
				sub = $1 || ':' || $2

To fully implement this change, you'll also need to:

  1. Update the schema to ensure subscriptions are stored in the exact format channelID:subtopic without wildcards
  2. Migrate existing subscription data to the new format
  3. Add a B-tree index on the unnested subscriptions if not already present:
    CREATE INDEX idx_clients_telemetry_subscriptions ON clients_telemetry USING gin (subscriptions);
  4. Update any code that stores subscriptions to use the new format
  5. Consider if you need partial matches - if so, you might want to store multiple subscription patterns (e.g., "channelID:*" and "channelID:subtopic") rather than using LIKE

The exact implementation details will depend on your subscription matching requirements and existing subscription patterns.

Copy link

@llamapreview llamapreview bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto Pull Request Review from LlamaPReview

Large PR Notification

Dear contributor,

Thank you for your substantial contribution to this project. LlamaPReview has detected that this Pull Request contains a large volume of changes, which exceeds our current processing capacity.

Details:

  • PR and related contents total size: Approximately 106,974 characters
  • Current limit: 50,000 characters

Next steps:

  1. Consider breaking this PR into smaller, more focused changes if possible.
  2. For manual review, please reach out to your team members or maintainers.

We appreciate your understanding and commitment to improving this project. Your contributions are valuable, and we want to ensure they receive the attention they deserve.

LlamaPReview is continuously evolving to better serve the community. Share your thoughts on handling large PRs in our GitHub Discussions - your feedback helps us improve and expand our capabilities.

If you have any questions or need assistance, our community and support team are here to help.

Best regards,
LlamaPReview Team

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

🔭 Outside diff range comments (2)
ws/handler.go (1)

Line range hint 292-343: authAccess needs stricter substring checks.
Line 323 checks if len(channelParts) < 1, but you later reference channelParts[1]. Use < 2 to avoid out-of-range errors.

-	if len(channelParts) < 1 {
+	if len(channelParts) < 2 {
mqtt/handler_test.go (1)

Line range hint 465-508: Add test cases for error scenarios in Unsubscribe.

The test cases for Unsubscribe don't cover error scenarios from the eventStore.

Add test cases for:

  1. EventStore returns an error
  2. Invalid topic format
  3. Empty topic list
 cases := []struct {
     desc      string
     session   *session.Session
     channelID string
     topic     []string
     logMsg    string
     authZRes  *grpcChannelsV1.AuthzRes
     authZErr  error
     err       error
 }{
+    {
+        desc:      "unsubscribe with eventStore error",
+        session:   &sessionClient,
+        topic:     topics,
+        channelID: chanID,
+        authZRes:  &grpcChannelsV1.AuthzRes{Authorized: true},
+        err:       errors.New("eventStore error"),
+    },
+    {
+        desc:      "unsubscribe with empty topic list",
+        session:   &sessionClient,
+        topic:     []string{},
+        channelID: chanID,
+        authZRes:  &grpcChannelsV1.AuthzRes{Authorized: true},
+        err:       mqtt.ErrMissingTopicUnsub,
+    },
🧹 Nitpick comments (25)
mqtt/events/streams.go (2)

26-27: Consider renaming instance to a more descriptive field name.

The field instance might be unclear to new contributors. A more specific name such as serviceInstance or deploymentID can improve code clarity.

-	instance  string
+	serviceInstance string

57-65: Fix the doc comment mismatch.

The doc string at line 57 references "MQTT CONNECT" while describing the Disconnect method. Update it for clarity since it is actually handling the MQTT DISCONNECT event.

-// Disconnect issues event on MQTT CONNECT.
+// Disconnect issues event on MQTT DISCONNECT.
coap/api/transport.go (2)

53-53: Global clients variable could pose concurrency risks.

Defining clients as a package-level variable can lead to concurrency or testing complexities if multiple CoAP servers run in parallel. Consider storing this in a struct for thread safety.

-var clients grpcClientsV1.ClientsServiceClient
+type coapAPIInstance struct {
+   clients grpcClientsV1.ClientsServiceClient
+   ...
+}

100-105: Centralized authentication call block.

This snippet ensures authentication is checked centrally. Should the authentication service be unavailable, consider returning a 503 Service Unavailable code to differentiate connectivity failures from invalid credentials.

journal/service.go (4)

52-58: Consider atomic persistence vs. telemetry fallback.
The telemetry handling is triggered after saving the journal. If telemetry fails, the journal save still persists. Consider whether a transactional approach (both or none) is desirable for consistency.


127-134: Potential duplicate subscriptions.
Appending a subscription string with connID:channelID:topic might cause duplicates. If it’s expected, fine; otherwise consider a uniqueness check or a set-based approach.


145-157: Partial update considerations.
updateMessageCount increments inbound and outbound messages in separate calls. If inbound fails, outbound is never incremented. If this partial failure is undesired, consider unified error handling or a transaction.


168-173: clientEvent struct is concise.
The struct is private, but if it needs external usage, consider exporting or documenting the fields.

journal/postgres/telemetry.go (2)

83-110: AddSubscription usage of ARRAY_APPEND is straightforward.
Be mindful of concurrency or repeated additions. If duplicates are problematic, consider a more robust approach.


187-215: Performance consideration in IncrementOutboundMessages.
Using a subselect with string matching on subscriptions can be costly in large datasets. Ensure an index or alternative approach to handle scale.

http/events/events.go (1)

8-12: Consider adding more event types for comprehensive HTTP operation tracking.

The current implementation only tracks client publish and connect events. Consider adding events for disconnect, error scenarios, and other significant HTTP operations for complete telemetry.

journal/api/responses.go (1)

38-44: Consider dynamic status codes based on operation result.

The Code() method always returns OK. Consider returning different status codes based on the operation result or content:

 func (res clientTelemetryRes) Code() int {
-	return http.StatusOK
+	if res.Empty() {
+		return http.StatusNoContent
+	}
+	return http.StatusOK
 }
mqtt/events/events.go (1)

21-24: Consider adding field documentation.

While the field names are self-explanatory, adding documentation comments would help clarify their purpose and expected values.

Add documentation like this:

 type mqttEvent struct {
+    // operation represents the type of MQTT event (e.g., publish, subscribe)
     operation string
+    // channelID uniquely identifies the MQTT channel
     channelID string
+    // clientID uniquely identifies the MQTT client
     clientID  string
+    // connID uniquely identifies the MQTT connection
     connID    string
+    // topic represents the MQTT topic for publish/subscribe events
     topic     string
     instance  string
 }
http/events/streams.go (1)

27-39: Enhance error handling in NewEventStore.

Consider wrapping errors with additional context and handling potential context cancellation.

 func NewEventStore(ctx context.Context, url, instance string) (EventStore, error) {
+    if ctx.Err() != nil {
+        return nil, fmt.Errorf("context error: %w", ctx.Err())
+    }
     publisher, err := store.NewPublisher(ctx, url, streamID)
     if err != nil {
-        return nil, err
+        return nil, fmt.Errorf("failed to create publisher: %w", err)
     }
journal/postgres/init.go (1)

33-33: Consider adding foreign key constraints and column comments.

While the default value for subscriptions is good, consider:

  1. Adding foreign key constraints for client_id and domain_id.
  2. Adding column comments to document the purpose and format of each column.
 CREATE TABLE IF NOT EXISTS clients_telemetry (
     client_id         VARCHAR(36) NOT NULL,
     domain_id         VARCHAR(36) NOT NULL,
-    subscriptions     TEXT[] DEFAULT '{}',
+    subscriptions     TEXT[] DEFAULT '{}' NOT NULL,
     inbound_messages  BIGINT DEFAULT 0,
     outbound_messages BIGINT DEFAULT 0,
     first_seen        TIMESTAMP,
     last_seen         TIMESTAMP,
-    PRIMARY KEY (client_id, domain_id)
+    PRIMARY KEY (client_id, domain_id),
+    FOREIGN KEY (client_id) REFERENCES clients(id) ON DELETE CASCADE,
+    FOREIGN KEY (domain_id) REFERENCES domains(id) ON DELETE CASCADE,
+    CONSTRAINT check_positive_messages CHECK (inbound_messages >= 0 AND outbound_messages >= 0)
 )

Also add column comments:

COMMENT ON COLUMN clients_telemetry.subscriptions IS 'Array of topic subscriptions for the client';
COMMENT ON COLUMN clients_telemetry.inbound_messages IS 'Count of messages received from the client';
COMMENT ON COLUMN clients_telemetry.outbound_messages IS 'Count of messages sent to the client';
coap/events/stream.go (2)

24-34: Consider adding context timeout handling.

The NewEventStoreMiddleware function should consider implementing context timeout handling for the publisher initialization.

 func NewEventStoreMiddleware(ctx context.Context, svc coap.Service, url string) (coap.Service, error) {
+	// Add timeout for publisher initialization
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
 	publisher, err := store.NewPublisher(ctx, url, streamID)
 	if err != nil {
 		return nil, err
 	}

36-53: Consider implementing retry mechanism for event publishing.

The event publishing operations (Publish, Subscribe, Unsubscribe) could benefit from a retry mechanism to handle temporary failures. Also, consider logging failed event publishing attempts for better observability.

Here's an example implementation for the Publish method (apply similar pattern to Subscribe and Unsubscribe):

 func (es *eventStore) Publish(ctx context.Context, clientID string, msg *messaging.Message) error {
 	err := es.svc.Publish(ctx, clientID, msg)
 	if err != nil {
 		return err
 	}

 	event := coapEvent{
 		operation: clientPublish,
 		clientID:  clientID,
 		channelID: msg.GetChannel(),
 		topic:     msg.GetSubtopic(),
 	}
-	if err := es.events.Publish(ctx, event); err != nil {
-		return err
-	}
+	// Implement retry with backoff
+	backoff := time.Second
+	for retries := 3; retries > 0; retries-- {
+		if err := es.events.Publish(ctx, event); err != nil {
+			if retries == 1 {
+				log.Printf("Failed to publish event after retries: %v", err)
+				return err
+			}
+			time.Sleep(backoff)
+			backoff *= 2
+			continue
+		}
+		break
+	}

 	return nil
 }

Also applies to: 55-73, 75-93

pkg/sdk/message_test.go (1)

151-152: Add test cases for event store error scenarios.

While the event store mock integration is good, consider adding test cases for:

  • Failed Connect calls
  • Failed Publish calls
  • Error propagation from event store to client

Also applies to: 163-164

journal/journal.go (1)

183-196: Consider adding batch operations for performance.

The new telemetry methods are well-defined, but consider adding batch operations for better performance when dealing with multiple subscriptions or message counts.

Add these methods to the interface:

+// AddSubscriptions adds multiple subscriptions to the client telemetry in a single operation.
+AddSubscriptions(ctx context.Context, clientID string, subs []string) error
+
+// IncrementMessageCounts increments both inbound and outbound message counts in a single operation.
+IncrementMessageCounts(ctx context.Context, clientID, channelID, subtopic string, inbound, outbound uint64) error
cmd/ws/main.go (1)

148-153: Consider adding connection retry logic.

The EventStore creation has proper error handling but could benefit from connection retry logic for better resilience.

Consider implementing retry logic:

+func createEventStoreWithRetry(ctx context.Context, url, instanceID string, maxRetries int) (events.EventStore, error) {
+    var lastErr error
+    for i := 0; i < maxRetries; i++ {
+        es, err := events.NewEventStore(ctx, url, instanceID)
+        if err == nil {
+            return es, nil
+        }
+        lastErr = err
+        time.Sleep(time.Second * time.Duration(i+1))
+    }
+    return nil, fmt.Errorf("failed to create event store after %d retries: %w", maxRetries, lastErr)
+}
http/api/endpoint_test.go (1)

229-230: Consider using mock.MatchedBy for event validation.

The EventStore mock expectations could be more specific about the expected event data.

Consider using mock.MatchedBy for better validation:

-eventsCall := es.On("Connect", mock.Anything, mock.Anything).Return(nil)
-eventsCall1 := es.On("Publish", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
+eventsCall := es.On("Connect", mock.Anything, mock.MatchedBy(func(req interface{}) bool {
+    // Add specific validation logic
+    return true
+})).Return(nil)
+eventsCall1 := es.On("Publish", mock.Anything, mock.MatchedBy(func(event interface{}) bool {
+    // Add specific validation logic
+    return true
+}), mock.Anything, mock.Anything).Return(nil)

Also applies to: 247-248

mqtt/handler.go (1)

283-298: Consider enhancing topic parsing validation.

The topic parsing function could benefit from additional validation.

Consider adding these validations:

 func parseTopic(topic string) (string, string, error) {
+	if topic == "" {
+		return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
+	}
+
 	channelParts := channelRegExp.FindStringSubmatch(topic)
 	if len(channelParts) < 2 {
 		return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
 	}
 
 	chanID := channelParts[1]
+	if len(chanID) == 0 {
+		return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
+	}
+
 	subtopic := channelParts[2]
 
 	subtopic, err := parseSubtopic(subtopic)
http/handler_test.go (1)

128-135: Consider adding negative test case for Connect.

The event store connect operation is only tested for success case.

Add a test case for connect failure:

 		{
 			desc:    "connect with valid username and password",
 			err:     nil,
 			session: &sessionClient,
 		},
+		{
+			desc:    "connect with event store failure",
+			err:     errFailedPublishEvent,
+			session: &sessionClient,
+			esErr:   errors.New("failed to connect to event store"),
+		},
ws/events/stream.go (1)

22-26: Consider adding configuration options.

The eventStore struct could benefit from additional configuration options.

Consider adding:

  1. Timeout configuration
  2. Retry policy
  3. Buffer size for events
 type eventStore struct {
     publisher events.Publisher
     instance  string
+    config    struct {
+        timeout    time.Duration
+        retryPolicy RetryPolicy
+        bufferSize  int
+    }
 }

+type RetryPolicy struct {
+    maxRetries  int
+    backoffTime time.Duration
+}
docker/docker-compose.yml (1)

178-178: Consider adding health check for Redis.

The domains-redis service should include health checks.

Add health check configuration:

 domains-redis:
   image: redis:7.2.4-alpine
   container_name: supermq-domains-redis
   restart: on-failure
+  healthcheck:
+    test: ["CMD", "redis-cli", "ping"]
+    interval: 10s
+    timeout: 5s
+    retries: 3
   networks:
     - supermq-base-net
   volumes:
     - supermq-domains-redis-volume:/data
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d77d005 and a15d2c7.

📒 Files selected for processing (40)
  • cmd/coap/main.go (4 hunks)
  • cmd/http/main.go (5 hunks)
  • cmd/ws/main.go (4 hunks)
  • coap/adapter.go (3 hunks)
  • coap/api/doc.go (1 hunks)
  • coap/api/logging.go (0 hunks)
  • coap/api/metrics.go (0 hunks)
  • coap/api/transport.go (7 hunks)
  • coap/events/doc.go (1 hunks)
  • coap/events/events.go (1 hunks)
  • coap/events/stream.go (1 hunks)
  • coap/tracing/adapter.go (0 hunks)
  • docker/docker-compose.yml (4 hunks)
  • http/api/endpoint_test.go (6 hunks)
  • http/events/doc.go (1 hunks)
  • http/events/events.go (1 hunks)
  • http/events/streams.go (1 hunks)
  • http/handler.go (5 hunks)
  • http/handler_test.go (7 hunks)
  • http/mocks/events.go (1 hunks)
  • journal/api/responses.go (2 hunks)
  • journal/journal.go (1 hunks)
  • journal/middleware/authorization.go (1 hunks)
  • journal/mocks/repository.go (2 hunks)
  • journal/postgres/init.go (1 hunks)
  • journal/postgres/telemetry.go (2 hunks)
  • journal/service.go (3 hunks)
  • mqtt/events/events.go (1 hunks)
  • mqtt/events/streams.go (2 hunks)
  • mqtt/handler.go (9 hunks)
  • mqtt/handler_test.go (7 hunks)
  • mqtt/mocks/events.go (1 hunks)
  • pkg/sdk/health_test.go (1 hunks)
  • pkg/sdk/message_test.go (5 hunks)
  • ws/api/endpoint_test.go (3 hunks)
  • ws/events/doc.go (1 hunks)
  • ws/events/events.go (1 hunks)
  • ws/events/stream.go (1 hunks)
  • ws/handler.go (11 hunks)
  • ws/mocks/events.go (1 hunks)
💤 Files with no reviewable changes (3)
  • coap/api/metrics.go
  • coap/tracing/adapter.go
  • coap/api/logging.go
✅ Files skipped from review due to trivial changes (4)
  • ws/events/doc.go
  • coap/events/doc.go
  • http/events/doc.go
  • coap/api/doc.go
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: api-test
  • GitHub Check: Lint and Build
🔇 Additional comments (71)
mqtt/events/streams.go (6)

17-21: Validate new interface methods for consistency.

You've introduced additional parameters to Connect/Disconnect and new methods for publishing and subscribing events. This is a well-structured approach for telemetry, but please ensure all client code (including tests) is updated accordingly to match these function signatures to avoid runtime errors.


40-40: Constructor usage looks good.

The publisher is properly instantiated. Just be mindful of potential resource cleanup if publisher requires closing or flushing on shutdown.


45-53: Confirm event data for CONNECT.

The Connect method properly populates operation = clientConnect. Ensure that the event schema (in any downstream consumer) accommodates connID for correlation or debugging.


68-79: Publish method is consistent with the event-driven design.

The approach of building an mqttEvent object and invoking the publisher is clear and maintainable. Good job!


81-93: Subscribe method aligns well with the new interface.

The updated signature with additional parameters (e.g., connID) looks consistent with your telemetry tracking needs.


95-106: Unsubscribe method matches the new telemetry requirements.

The final event is prepared similarly to Subscribe and Publish, ensuring traceability of unsubscribed topics. Nicely done.

coap/adapter.go (5)

29-36: New method signatures for Publish, Subscribe, and Unsubscribe.

These changes removing reliance on a separate key parameter and introducing clientID bring the CoAP adapter in line with your expanded interface. Check for any lingering references to the outdated parameter in your codebase and tests.


48-48: Constructor updated to remove clients parameter.

The removal of clients in New reduces complexity. Double-check that no external modules still rely on an older constructor signature.


Line range hint 57-71: Ensure compliance with PII and logging policies.

Storing the clientID as msg.Publisher is logical. If clientID can be considered sensitive, confirm that it's not inadvertently exposed in logs or external data exports.


76-76: Subscribe method.

You properly handle authorization, then build a subscription config. This is well-executed.


104-107: Verify domain usage in unsubscribe.

Here, DomainId is set to an empty string. Confirm if this is intentional or if it should reflect a non-empty domain for improved scope or multi-tenancy control.

coap/api/transport.go (5)

18-18: Importing gRPC clients.

This import (grpcClientsV1) is necessary for the new authentication flow. Good addition.


66-69: Expanded MakeCoAPHandler signature.

Passing cli grpcClientsV1.ClientsServiceClient into the handler clarifies the flow of dependencies. Make sure to gracefully handle nil or misconfigured clients in error scenarios.


110-113: Handling GET/POST with clientID.

Replacing the old key usage with clientID is consistent with the new approach. This clarifies who is actually publishing messages or initiating requests.


Line range hint 134-148: Refined handleGet logic for subscribe/unsubscribe.

You robustly track the observation state, unsubscribing when needed. This logic is clear and well-structured.


226-239: authenticate function effectively delegates to gRPC.

Excellent job encapsulating the complexity of verifying a client’s credentials. Just confirm that any logs around failed authentication do not inadvertently leak sensitive data like key.

journal/service.go (8)

8-9: Imports look good.
These standard library imports (fmt, time) are fine and align with usage in subsequent code.


17-31: Constants for operations are well structured.
Defining these string constants for various client operations is a clean way to handle event/telemetry logic.


79-103: Switch-based event handling is clear.
This switch statement is easy to follow. For future extensibility, ensure it stays manageable or consider more decoupled strategies (e.g., mapping operations to function pointers).


105-117: Insert-only client telemetry may need handling for duplicates.
addClientTelemetry simply creates a fresh record. If the client already exists, consider if an upsert or conflict resolution is required, depending on business logic.


119-125: Removal of client telemetry is straightforward.
Relies on DeleteClientTelemetry. Ensure upstream logic gracefully handles any references to a removed client.


136-143: Remove subscription logic is consistent.
No immediate concerns with removeSubscription. The repository approach is aligned with your add/remove design.


159-166: Bulk removal by connection ID.
removeSubscriptionWithConnID removes all subscriptions matching that connection ID. This is fine as long as that broad removal is intended.


199-205: adapterEvent struct looks good.
No immediate issues. This is a straightforward container for event data.

journal/postgres/telemetry.go (4)

19-20: Columns for inbound/outbound messages align with new design.
The insert statement properly references the extended schema.


112-138: RemoveSubscription logic is clean.
Removes the specified subscription from the array. The rowsAffected check properly handles unknown clients or missing subscriptions.


140-156: Removing subscription by connID is direct but removes all matching.
If partial removal is required, this might be too broad. Otherwise, this approach is acceptable for a bulk cleanup.


158-185: Incrementing inbound messages.
This is an effective approach. Consider whether domain constraints and concurrency might affect correctness.

ws/handler.go (9)

25-25: New events import is appropriate.
This import aligns with the newly introduced EventStore.


52-54: Additional event-related errors.
Naming is consistent with existing error variables (errFailedPublishEvent, etc.).


66-66: Dependency injection for EventStore.
Adding es events.EventStore to the handler is a clear approach to inject event functionality.


70-77: NewHandler now takes event store parameter.
This is a clean handshake between the handler and the event store.


106-117: Publishing event on AuthPublish.
Invoking h.es.Publish after permissions are confirmed is logical. Add logs if needed for debugging event issues.


118-118: Trivial return statement.
This simply returns nil. No issues here.


176-176: Topic parsing in Publish method.
Acquiring chanID and subtopic via parseTopic is consistent. No issues.

Also applies to: 178-178


234-237: Publishing event after message broker publish.
This ensures telemetry or subsequent tracking. Implementation looks good.


258-282: ⚠️ Potential issue

Call to Subscribe in Unsubscribe flow.
This is likely an error; unsubscribing should call an Unsubscribe method in the event store.

Here’s the recommended fix:

-	if err := h.es.Subscribe(ctx, clientID, channelID, subTopic); err != nil {
-		return errors.Wrap(errFailedSubscribeEvent, err)
-	}
+	if err := h.es.Unsubscribe(ctx, clientID, channelID, subTopic); err != nil {
+		return errors.Wrap(errFailedUnsubscribeEvent, err)
+	}

Likely invalid or redundant comment.

mqtt/events/events.go (2)

8-15: LGTM! Well-structured event type constants.

The constants follow a clear naming pattern with a common prefix, making the event types easily identifiable and maintainable.


30-34: LGTM! Consistent field mapping in Encode method.

The field mapping follows a clear pattern and includes all necessary fields.

http/events/streams.go (1)

15-19: LGTM! Clean interface design.

The EventStore interface is well-defined with clear method signatures.

http/mocks/events.go (1)

1-66: LGTM! Well-structured mock implementation.

The auto-generated mock follows best practices and includes proper error handling and cleanup.

journal/postgres/init.go (1)

40-40: LGTM! Appropriate use of GIN index for array column.

The GIN index is the right choice for optimizing queries that search within the subscriptions array.

journal/middleware/authorization.go (1)

73-73: LGTM! Consistent authorization pattern.

The change from session.UserID to session.DomainUserID aligns with the domain-based authorization pattern used elsewhere in the file, making the authorization checks more consistent across methods.

coap/events/stream.go (1)

17-20: LGTM! Clean struct definition.

The eventStore struct has a clean design with clear separation of concerns between event publishing and service operations.

pkg/sdk/health_test.go (1)

26-26: LGTM! Proper test setup.

The change correctly captures all return values from setupMessages() and maintains proper cleanup with defer.

ws/mocks/events.go (1)

72-84: LGTM! Clean mock factory implementation.

The NewEventStore factory function is well-implemented with proper cleanup and expectations handling.

mqtt/mocks/events.go (1)

18-100: LGTM! Well-structured mock implementation.

The EventStore mock implementation is well-structured with:

  • Consistent error handling across all methods
  • Proper type assertions for return values
  • Comprehensive panic checks for unspecified return values
cmd/coap/main.go (2)

51-51: LGTM! Configuration follows established patterns.

The ESURL environment variable is well-defined with:

  • Consistent naming convention
  • Reasonable default value
  • Proper type definition

148-155: LGTM! Proper middleware integration with error handling.

The event store middleware integration is well-implemented with:

  • Correct initialization order
  • Proper error handling
  • Clear error messaging
journal/mocks/repository.go (1)

Line range hint 19-125: LGTM! Comprehensive mock implementation for telemetry tracking.

The Repository mock implementation is well-structured with:

  • Proper subscription management methods
  • Message counting functionality
  • Consistent error handling
  • Clear method signatures
ws/api/endpoint_test.go (2)

44-46: LGTM! Clean function signature update.

The function now correctly returns the PubSub mock, allowing for better test verification.


101-101: Verify EventStore mock expectations.

The EventStore mock is created but there are no expectations set on it. Consider adding mock expectations to verify the EventStore interactions.

Also applies to: 105-105

cmd/ws/main.go (1)

59-59: LGTM! Configuration follows established pattern.

The ESURL configuration field follows the project's configuration pattern and provides a sensible default.

http/api/endpoint_test.go (1)

44-44: LGTM! Service creation properly updated.

The newService function signature has been correctly updated to include the EventStore dependency.

Also applies to: 46-46

cmd/http/main.go (2)

63-63: LGTM! Environment variable configuration for event store.

The ESURL field follows the established pattern for URL configurations with appropriate default value.


146-151: LGTM! Event store initialization with proper error handling.

The initialization follows the established pattern of other service initializations with appropriate error handling and logging.

http/handler.go (3)

53-53: LGTM! Error constant for event publishing failures.

The error constant follows the established pattern and provides a clear message.


69-69: LGTM! Handler struct and constructor updates.

The event store field is properly added to the handler struct and initialized in the constructor.

Also applies to: 73-81


199-201: LGTM! Event publishing with proper error handling.

The event store publish call is properly added with appropriate error wrapping.

mqtt/handler.go (4)

56-58: LGTM! Error constants for event store operations.

The error constants follow the established pattern and provide clear messages.


113-113: LGTM! Enhanced connect event with user identification.

The connect event now includes both username and ID for better tracking.


174-176: LGTM! Topic parsing and event publishing.

The topic parsing is properly extracted into a utility function and event publishing includes appropriate error handling.

Also applies to: 191-193


226-240: LGTM! Event store unsubscription handling.

The unsubscribe operation properly validates topics and handles event store unsubscription.

http/handler_test.go (2)

62-62: LGTM! Test setup with event store mock.

The event store mock is properly initialized and integrated into the handler setup.

Also applies to: 69-70, 78-81


Line range hint 153-371: LGTM! Comprehensive test coverage for event publishing.

The test cases cover both success and failure scenarios for event store publishing.

mqtt/handler_test.go (4)

542-542: Add assertions for Disconnect parameters.

The Disconnect mock is set up but doesn't verify the username parameter.

Consider adding assertions for the username parameter to ensure it's passed correctly:

-svcCall := eventStore.On("Disconnect", mock.Anything, sessionClient.Username, sessionClient.ID).Return(tc.err)
+svcCall := eventStore.On("Disconnect", mock.Anything, mock.MatchedBy(func(username interface{}) bool {
+    return username == sessionClient.Username
+}), sessionClient.ID).Return(tc.err)

150-150: Verify error handling in Connect mock.

The mock setup for eventStore.Connect only returns the error but doesn't verify the input parameters.

Consider adding expectations for the context and clientID parameters:

-svcCall := eventStore.On("Connect", mock.Anything, mock.Anything, clientID).Return(tc.err)
+svcCall := eventStore.On("Connect", mock.Anything, mock.MatchedBy(func(params ...interface{}) bool {
+    return len(params) == 2 && params[1] == clientID
+}), clientID).Return(tc.err)

414-418: Add assertions for Publish parameters.

The Publish mock is set up but doesn't verify the topic parameter.

Consider adding assertions for the topic parameter to ensure it's passed correctly:

-eventsCall := eventStore.On("Publish", mock.Anything, clientID, mock.Anything, mock.Anything).Return(nil)
+eventsCall := eventStore.On("Publish", mock.Anything, clientID, tc.topic, mock.MatchedBy(func(payload interface{}) bool {
+    return payload != nil
+})).Return(nil)

298-302: Add assertions for Subscribe parameters.

The Subscribe mock is set up but doesn't verify the channelID and subtopic parameters.

Consider adding assertions for the channelID and subtopic parameters to ensure they're passed correctly:

-eventsCall := eventStore.On("Subscribe", mock.Anything, clientID1, mock.Anything, mock.Anything).Return(nil)
+eventsCall := eventStore.On("Subscribe", mock.Anything, clientID1, tc.channelID, mock.MatchedBy(func(subtopic interface{}) bool {
+    return subtopic != nil
+})).Return(nil)
✅ Verification successful

Review suggestion is valid and beneficial

The review correctly identifies that the test should verify the channelID and subtopic parameters passed to EventStore.Subscribe. The suggested mock verification would ensure:

  • The channelID from test case is correctly passed through
  • The subtopic parameter is properly handled
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find test case structure and AuthSubscribe implementation
echo "=== Test Cases ==="
rg "type.*test.*struct" mqtt/handler_test.go -A 10

echo -e "\n=== Test Setup ==="
rg "func Test.*AuthSubscribe" mqtt/handler_test.go -A 20

echo -e "\n=== AuthSubscribe Implementation ==="
ast-grep --pattern 'func (*_) AuthSubscribe($_) $_{ $$$ }'

echo -e "\n=== Similar Mock Verifications ==="
rg "On\(\"Subscribe\"" mqtt/handler_test.go -B 2 -A 2

Length of output: 1584


Script:

#!/bin/bash
# Find AuthSubscribe implementation and topic handling
echo "=== AuthSubscribe Implementation ==="
rg "func.*AuthSubscribe" -A 20

echo -e "\n=== Topic/Subtopic Handling ==="
rg "func.*(Subscribe|ParseTopic|ExtractSubtopic)" -A 10

Length of output: 42478

ws/events/stream.go (1)

15-20: LGTM! Well-defined interface.

The EventStore interface is clean and follows good design principles with clear method signatures.

docker/docker-compose.yml (1)

1064-1064: Verify SMQ_ES_URL configuration across services.

The SMQ_ES_URL environment variable is added to multiple services but its configuration might need to be synchronized.

Run this script to check for consistency in the ES URL configuration:

Also applies to: 1151-1151, 1229-1229

✅ Verification successful

SMQ_ES_URL configuration is properly synchronized across services

All services consistently reference ${SMQ_ES_URL}, which is correctly defined in docker/.env and maps to a common message broker URL. The configuration follows environment variable best practices with proper variable chaining.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if SMQ_ES_URL is consistently configured across services

# Check if SMQ_ES_URL is defined in the environment file
grep "SMQ_ES_URL" .env

# List all services that use SMQ_ES_URL
echo "Services using SMQ_ES_URL:"
rg "SMQ_ES_URL:" docker-compose.yml

Length of output: 218


Script:

#!/bin/bash
# Find all .env files
echo "=== Searching for .env files ==="
fd -H "\.env"

echo -e "\n=== Checking SMQ_ES_URL in docker-compose.yml ==="
cat docker/docker-compose.yml | grep -A 1 "SMQ_ES_URL:"

echo -e "\n=== Searching for SMQ_ES_URL across all files ==="
rg "SMQ_ES_URL" --type-not git

Length of output: 1128


Script:

#!/bin/bash
echo "=== Checking SMQ_ES_URL in docker/.env ==="
grep "SMQ_ES_URL" docker/.env

Length of output: 161

Comment on lines 174 to 243
func toClientEvent(journal Journal) (clientEvent, error) {
var createdAt time.Time
var err error
id, ok := journal.Attributes["id"].(string)
if !ok {
return clientEvent{}, fmt.Errorf("invalid id attribute")
}
domain, ok := journal.Attributes["domain"].(string)
if !ok {
return clientEvent{}, fmt.Errorf("invalid domain attribute")
}
createdAtStr := journal.Attributes["created_at"].(string)
if createdAtStr == "" {
createdAt, err = time.Parse(time.RFC3339, createdAtStr)
if err != nil {
return clientEvent{}, fmt.Errorf("invalid created_at format")
}
}
return clientEvent{
id: id,
domain: domain,
createdAt: createdAt,
}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Reverse the createdAtStr check.
Currently, the code parses the time only if createdAtStr == "", which will never parse a non-empty string. This looks inverted.

Here is a suggested fix:

-	if createdAtStr == "" {
+	if createdAtStr != "" {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func toClientEvent(journal Journal) (clientEvent, error) {
var createdAt time.Time
var err error
id, ok := journal.Attributes["id"].(string)
if !ok {
return clientEvent{}, fmt.Errorf("invalid id attribute")
}
domain, ok := journal.Attributes["domain"].(string)
if !ok {
return clientEvent{}, fmt.Errorf("invalid domain attribute")
}
createdAtStr := journal.Attributes["created_at"].(string)
if createdAtStr == "" {
createdAt, err = time.Parse(time.RFC3339, createdAtStr)
if err != nil {
return clientEvent{}, fmt.Errorf("invalid created_at format")
}
}
return clientEvent{
id: id,
domain: domain,
createdAt: createdAt,
}, nil
}
func toClientEvent(journal Journal) (clientEvent, error) {
var createdAt time.Time
var err error
id, ok := journal.Attributes["id"].(string)
if !ok {
return clientEvent{}, fmt.Errorf("invalid id attribute")
}
domain, ok := journal.Attributes["domain"].(string)
if !ok {
return clientEvent{}, fmt.Errorf("invalid domain attribute")
}
createdAtStr := journal.Attributes["created_at"].(string)
if createdAtStr != "" {
createdAt, err = time.Parse(time.RFC3339, createdAtStr)
if err != nil {
return clientEvent{}, fmt.Errorf("invalid created_at format")
}
}
return clientEvent{
id: id,
domain: domain,
createdAt: createdAt,
}, nil
}

Comment on lines 206 to 352
func toAdapterEvent(journal Journal) (adapterEvent, error) {
clientID := journal.Attributes["client_id"].(string)
connID := journal.Attributes["conn_id"].(string)
channelID := journal.Attributes["channel_id"].(string)
topic := journal.Attributes["topic"].(string)
return adapterEvent{
clientID: clientID,
connID: connID,
channelID: channelID,
topic: topic,
}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add type-safety in toAdapterEvent.
Casting attributes to string without verifying they exist or have the correct type can panic if the map or type is unexpected. Safeguard with type checks:

-	clientID := journal.Attributes["client_id"].(string)
+	clientIDVal, ok := journal.Attributes["client_id"].(string)
+	if !ok {
+		return adapterEvent{}, fmt.Errorf("client_id attribute missing or invalid")
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func toAdapterEvent(journal Journal) (adapterEvent, error) {
clientID := journal.Attributes["client_id"].(string)
connID := journal.Attributes["conn_id"].(string)
channelID := journal.Attributes["channel_id"].(string)
topic := journal.Attributes["topic"].(string)
return adapterEvent{
clientID: clientID,
connID: connID,
channelID: channelID,
topic: topic,
}, nil
}
func toAdapterEvent(journal Journal) (adapterEvent, error) {
clientIDVal, ok := journal.Attributes["client_id"].(string)
if !ok {
return adapterEvent{}, fmt.Errorf("client_id attribute missing or invalid")
}
connIDVal, ok := journal.Attributes["conn_id"].(string)
if !ok {
return adapterEvent{}, fmt.Errorf("conn_id attribute missing or invalid")
}
channelIDVal, ok := journal.Attributes["channel_id"].(string)
if !ok {
return adapterEvent{}, fmt.Errorf("channel_id attribute missing or invalid")
}
topicVal, ok := journal.Attributes["topic"].(string)
if !ok {
return adapterEvent{}, fmt.Errorf("topic attribute missing or invalid")
}
return adapterEvent{
clientID: clientIDVal,
connID: connIDVal,
channelID: channelIDVal,
topic: topicVal,
}, nil
}

ws/handler.go Outdated
Comment on lines 345 to 336
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errors.Wrap(errFailedPublish, errMalformedTopic)
}

chanID := channelParts[1]
subtopic := channelParts[2]

subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(errFailedParseSubtopic, err)
}

return chanID, subtopic, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

parseTopic also risks out-of-range on channelParts[2].
Here, we must require len(channelParts) >= 3 since the subtopic capture is at index 2.

-	if len(channelParts) < 2 {
+	if len(channelParts) < 3 {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errors.Wrap(errFailedPublish, errMalformedTopic)
}
chanID := channelParts[1]
subtopic := channelParts[2]
subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(errFailedParseSubtopic, err)
}
return chanID, subtopic, nil
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 3 {
return "", "", errors.Wrap(errFailedPublish, errMalformedTopic)
}
chanID := channelParts[1]
subtopic := channelParts[2]
subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(errFailedParseSubtopic, err)
}
return chanID, subtopic, nil

ws/handler.go Outdated
Comment on lines 141 to 135
clientID, err := h.authAccess(ctx, token, topic, connections.Subscribe)
if err != nil {
return err
}
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, clientID, channelID, subTopic); err != nil {
return errors.Wrap(errFailedUnsubscribeEvent, err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect error reference for subscribe events.
Line 150 uses errFailedUnsubscribeEvent, but this is the subscribe flow.

Apply the following fix:

-	return errors.Wrap(errFailedUnsubscribeEvent, err)
+	return errors.Wrap(errFailedSubscribeEvent, err)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
clientID, err := h.authAccess(ctx, token, topic, connections.Subscribe)
if err != nil {
return err
}
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, clientID, channelID, subTopic); err != nil {
return errors.Wrap(errFailedUnsubscribeEvent, err)
}
clientID, err := h.authAccess(ctx, token, topic, connections.Subscribe)
if err != nil {
return err
}
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, clientID, channelID, subTopic); err != nil {
return errors.Wrap(errFailedSubscribeEvent, err)
}

Comment on lines 24 to 33
func (ce httpEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": ce.operation,
"channel_id": ce.channelID,
"client_id": ce.clientID,
"topic": ce.topic,
}
return val, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add instance field to encoded output and consider adding validation.

The instance field is defined in the struct but not included in the encoded output. Also, consider adding validation for required fields.

 func (ce httpEvent) Encode() (map[string]interface{}, error) {
+	if ce.operation == "" || ce.channelID == "" || ce.clientID == "" {
+		return nil, fmt.Errorf("missing required fields")
+	}
 	val := map[string]interface{}{
 		"operation":  ce.operation,
 		"channel_id": ce.channelID,
 		"client_id":  ce.clientID,
 		"topic":      ce.topic,
+		"instance":   ce.instance,
 	}
 	return val, nil
 }

Committable suggestion skipped: line range outside the PR's diff.

cmd/http/main.go Outdated
@@ -163,7 +172,7 @@ func main() {
defer pub.Close()
pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub)

svc := newService(pub, authn, clientsClient, channelsClient, logger, tracer)
svc := newService(pub, eventStore, authn, clientsClient, channelsClient, logger, tracer)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing cleanup code for event store confirmed

The event store implementations (Redis, RabbitMQ, NATS) all provide Close() methods for cleanup, but the defer cleanup is missing in main.go. The suggested fix to add defer eventStore.Close() is correct.

🔗 Analysis chain

Verify event store lifecycle management.

The event store is properly initialized and passed to the service, but there's no explicit cleanup in the defer block.

Consider adding cleanup code:

 	pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub)
+	defer eventStore.Close()
 
 	svc := newService(pub, eventStore, authn, clientsClient, channelsClient, logger, tracer)

Also applies to: 202-207

🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find eventStore type definition and initialization
ast-grep --pattern 'eventStore :=' 
rg -A 5 -B 5 'eventStore :=' cmd/http/main.go

# Look for any Close method related to event store
ast-grep --pattern 'func ($_ *EventStore) Close'
rg -A 2 'Close\(\)' 

# Check main.go for cleanup patterns
rg -A 5 'defer' cmd/http/main.go

Length of output: 59229

http/handler.go Outdated
Comment on lines 102 to 104
if err := h.es.Connect(ctx, tok); err != nil {
h.logger.Error(errors.Wrap(errFailedPublishEvent, err).Error())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider not swallowing the event store connection error.

The error from h.es.Connect is logged but not propagated. This could hide issues with event store connectivity.

Consider propagating the error:

 	if err := h.es.Connect(ctx, tok); err != nil {
-		h.logger.Error(errors.Wrap(errFailedPublishEvent, err).Error())
+		return errors.Wrap(errFailedPublishEvent, err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err := h.es.Connect(ctx, tok); err != nil {
h.logger.Error(errors.Wrap(errFailedPublishEvent, err).Error())
}
if err := h.es.Connect(ctx, tok); err != nil {
return errors.Wrap(errFailedPublishEvent, err)
}

mqtt/handler.go Outdated
Comment on lines 204 to 217

for _, topic := range *topics {
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
return errors.Wrap(ErrFailedUnsubscribeEvent, err)
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect error wrapping in Subscribe.

The error wrapping uses ErrFailedUnsubscribeEvent instead of ErrFailedSubscribeEvent.

Apply this fix:

 		if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
-			return errors.Wrap(ErrFailedUnsubscribeEvent, err)
+			return errors.Wrap(ErrFailedSubscribeEvent, err)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, topic := range *topics {
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
return errors.Wrap(ErrFailedUnsubscribeEvent, err)
}
}
for _, topic := range *topics {
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
return errors.Wrap(ErrFailedSubscribeEvent, err)
}
}

Comment on lines 42 to 53
// Publish issues event on WS PUBLISH.
func (es *eventStore) Publish(ctx context.Context, clientID, channelID, topic string) error {
ev := wsEvent{
clientID: clientID,
operation: clientPublish,
channelID: channelID,
topic: topic,
instance: es.instance,
}

return es.publisher.Publish(ctx, ev)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add context timeout and error wrapping.

The event methods should handle context timeout and wrap errors with more context.

Apply these improvements to all three methods:

 func (es *eventStore) Publish(ctx context.Context, clientID, channelID, topic string) error {
+    if ctx.Err() != nil {
+        return fmt.Errorf("context error in Publish: %w", ctx.Err())
+    }
+
     ev := wsEvent{
         clientID:  clientID,
         operation: clientPublish,
         channelID: channelID,
         topic:     topic,
         instance:  es.instance,
     }

-    return es.publisher.Publish(ctx, ev)
+    if err := es.publisher.Publish(ctx, ev); err != nil {
+        return fmt.Errorf("failed to publish event: %w", err)
+    }
+    return nil
 }

Also applies to: 55-66, 68-79

Comment on lines 30 to 40
func NewEventStore(ctx context.Context, url, instance string) (EventStore, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
if err != nil {
return nil, err
}

return &eventStore{
instance: instance,
publisher: publisher,
}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for URL parameter.

The NewEventStore function should validate the URL parameter.

Add URL validation:

 func NewEventStore(ctx context.Context, url, instance string) (EventStore, error) {
+    if url == "" {
+        return nil, errors.New("url cannot be empty")
+    }
+    if instance == "" {
+        return nil, errors.New("instance cannot be empty")
+    }
+
     publisher, err := store.NewPublisher(ctx, url, streamID)
     if err != nil {
         return nil, err
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func NewEventStore(ctx context.Context, url, instance string) (EventStore, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
if err != nil {
return nil, err
}
return &eventStore{
instance: instance,
publisher: publisher,
}, nil
}
func NewEventStore(ctx context.Context, url, instance string) (EventStore, error) {
if url == "" {
return nil, errors.New("url cannot be empty")
}
if instance == "" {
return nil, errors.New("instance cannot be empty")
}
publisher, err := store.NewPublisher(ctx, url, streamID)
if err != nil {
return nil, err
}
return &eventStore{
instance: instance,
publisher: publisher,
}, nil
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

♻️ Duplicate comments (1)
ws/handler.go (1)

322-336: ⚠️ Potential issue

Fix potential index out of range in parseTopic.

This is the same issue as in mqtt handler - the length check should be < 3 since we access index 2.

🧹 Nitpick comments (12)
coap/events/stream.go (2)

36-53: Consider partial failure handling between service publish and event publish.
Currently, the service operation (es.svc.Publish) succeeds before the event publishing is attempted. If es.events.Publish fails, the system may end up in an inconsistent state. Consider a retry mechanism, logging, or compensating action (e.g., rolling back the underlying operation if feasible) to handle such partial failures.


75-93: Maintain consistent error and logging patterns across all subscription methods.
Unsubscribe follows the same pattern as Subscribe but includes a separate parameter for the token. Verify that error and logging patterns are consistent with Subscribe/Publish for accurate troubleshooting.

http/handler.go (2)

73-80: Enhance documentation for the new parameter.
Add a short docstring note describing es (the EventStore) in NewHandler to ensure new contributors understand its purpose.


199-202: Consider partial failure fallback for event publishing.
If h.publisher.Publish succeeds, but h.es.Publish fails, the event logs won’t capture it. You may want to log a warning or attempt a retry.

journal/journal.go (2)

186-187: Ensure atomic operations for subscription management.

Both RemoveSubscription and RemoveSubscriptionWithConnID methods should be atomic operations to prevent race conditions when multiple clients are modifying subscriptions simultaneously.

Also applies to: 189-190


192-196: Consider adding batch operations for message counting.

The current design increments messages one at a time. For high-throughput scenarios, consider adding batch operations:

-IncrementInboundMessages(ctx context.Context, clientID string) error
+IncrementInboundMessagesBatch(ctx context.Context, clientID string, count uint64) error

-IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error
+IncrementOutboundMessagesBatch(ctx context.Context, channelID, subtopic string, count uint64) error
cmd/http/main.go (1)

63-63: Consider making ESURL configurable per environment.

The default value for ESURL is hardcoded. Consider making it more configurable based on the deployment environment.

-ESURL         string  `env:"SMQ_ES_URL"                   envDefault:"nats://localhost:4222"`
+ESURL         string  `env:"SMQ_ES_URL"                   envDefault:"${SMQ_DEFAULT_ES_URL:=nats://localhost:4222}"`
mqtt/handler.go (1)

191-193: Consider adding retry mechanism for event store operations.

The event store publish operation could fail due to temporary network issues. Consider implementing a retry mechanism with backoff.

ws/handler.go (1)

98-107: Consider batching event store operations.

Multiple event store operations are performed sequentially. Consider batching these operations to improve performance.

coap/adapter.go (1)

Line range hint 76-103: Consider caching authorization results.

The authorization check in Subscribe could benefit from caching to reduce load on the channels service, especially for clients with multiple subscriptions.

pkg/sdk/message_test.go (1)

61-61: Add test cases for event store failures.

While the happy path is tested, consider adding test cases for:

  • Event store connection failures
  • Event publishing failures

Also applies to: 151-152

coap/api/transport.go (1)

100-105: Consider adding request tracing.

Adding trace IDs would help with debugging authentication failures across services.

 clientID, err := authenticate(m.Context(), key)
 if err != nil {
-    logger.Warn(fmt.Sprintf("Error authenticating: %s", err))
+    logger.Warn(fmt.Sprintf("Error authenticating: %s", err),
+        slog.String("trace_id", generateTraceID()))
     resp.SetCode(codes.Unauthorized)
     return
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a15d2c7 and 87413b4.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (41)
  • cmd/coap/main.go (4 hunks)
  • cmd/http/main.go (5 hunks)
  • cmd/ws/main.go (4 hunks)
  • coap/adapter.go (3 hunks)
  • coap/api/doc.go (1 hunks)
  • coap/api/logging.go (0 hunks)
  • coap/api/metrics.go (0 hunks)
  • coap/api/transport.go (7 hunks)
  • coap/events/doc.go (1 hunks)
  • coap/events/events.go (1 hunks)
  • coap/events/stream.go (1 hunks)
  • coap/tracing/adapter.go (0 hunks)
  • docker/docker-compose.yml (4 hunks)
  • go.mod (4 hunks)
  • http/api/endpoint_test.go (6 hunks)
  • http/events/doc.go (1 hunks)
  • http/events/events.go (1 hunks)
  • http/events/streams.go (1 hunks)
  • http/handler.go (5 hunks)
  • http/handler_test.go (7 hunks)
  • http/mocks/events.go (1 hunks)
  • journal/api/responses.go (2 hunks)
  • journal/journal.go (1 hunks)
  • journal/middleware/authorization.go (1 hunks)
  • journal/mocks/repository.go (2 hunks)
  • journal/postgres/init.go (1 hunks)
  • journal/postgres/telemetry.go (2 hunks)
  • journal/service.go (3 hunks)
  • mqtt/events/events.go (1 hunks)
  • mqtt/events/streams.go (2 hunks)
  • mqtt/handler.go (9 hunks)
  • mqtt/handler_test.go (7 hunks)
  • mqtt/mocks/events.go (1 hunks)
  • pkg/sdk/health_test.go (1 hunks)
  • pkg/sdk/message_test.go (5 hunks)
  • ws/api/endpoint_test.go (3 hunks)
  • ws/events/doc.go (1 hunks)
  • ws/events/events.go (1 hunks)
  • ws/events/stream.go (1 hunks)
  • ws/handler.go (11 hunks)
  • ws/mocks/events.go (1 hunks)
💤 Files with no reviewable changes (3)
  • coap/tracing/adapter.go
  • coap/api/logging.go
  • coap/api/metrics.go
🚧 Files skipped from review as they are similar to previous changes (20)
  • coap/api/doc.go
  • coap/events/doc.go
  • http/events/doc.go
  • cmd/coap/main.go
  • ws/events/doc.go
  • journal/postgres/init.go
  • ws/events/stream.go
  • coap/events/events.go
  • ws/mocks/events.go
  • pkg/sdk/health_test.go
  • http/events/events.go
  • journal/middleware/authorization.go
  • mqtt/events/events.go
  • http/events/streams.go
  • cmd/ws/main.go
  • http/mocks/events.go
  • ws/events/events.go
  • ws/api/endpoint_test.go
  • journal/api/responses.go
  • journal/service.go
🔇 Additional comments (32)
coap/events/stream.go (2)

22-34: Ensure graceful handling of publisher creation failures in other layers.
Creation of the publisher here is properly handled if an error occurs, but verify that dependant layers (e.g., CoAP handlers, or main entrypoints) are gracefully reacting to this failure path to avoid partial initialization states.

Would you like a script to scan for references to NewEventStoreMiddleware across the codebase and check if errors are consistently handled?


55-73: Double-check concurrency requirements for event publishing.
The synchronous call to es.svc.Subscribe followed by es.events.Publish might have concurrency implications if multiple clients attempt to subscribe simultaneously. Ensure that either the underlying svc or es.events usage handles concurrency properly, or document that concurrency must be orchestrated externally.

mqtt/mocks/events.go (5)

18-33: Validate correct usage of three-argument Connect in tests.
Because a panic is raised if no return value is specified, ensure each test that calls _m.Connect provides a proper return value in the mock.


36-52: Match the new Disconnect signature where used.
Similar to the Connect method, verify that every caller of _m.Disconnect correctly supplies the connID argument and sets an appropriate mock response.


54-70: Mock method behavior looks correct.
The Publish signature and usage of _m.Called appear correct. The panic for unspecified return values aligns with testify/mock defaults.


72-88: Maintain consistency in subtopic-based tests.
Because Subscribe includes an additional subtopic argument, confirm that all test scenarios supply a subtopic (even if empty) to avoid unexpected panics.


Line range hint 90-106: Ensure thorough coverage of Unsubscribe in tests.
The Unsubscribe method now accepts five parameters. Be mindful to provide mock return values in all test cases, especially if Unsubscribe is invoked with corner cases (e.g., empty subtopic).

http/handler.go (3)

21-21: Confirm import path correctness.
The import of "github.com/absmach/supermq/http/events" is straightforward, but ensure that your local modules or build scripts reference the correct package path for the newly introduced events package.


53-53: Error usage is consistent.
errFailedPublishEvent matches the naming convention for other error variables in this file.


69-69: Injecting EventStore via dependency is a solid design choice.
Storing es events.EventStore in the handler fosters testability and clear separation of concerns.

mqtt/events/streams.go (8)

17-21: LGTM! Well-defined interface for MQTT event handling.

The interface provides a comprehensive set of methods for handling MQTT events with consistent error handling.


26-27: LGTM! Clean and focused struct definition.

The struct contains only the necessary fields for event publishing functionality.


40-40: LGTM! Proper initialization of the event store.

The function correctly initializes the publisher and handles potential errors.


69-79: Verify the clientPublish constant.

The implementation looks good, but let's verify the definition of the clientPublish constant to ensure it matches the expected operation type.

✅ Verification successful

✓ The clientPublish constant is correctly defined and used

The constant is properly defined in mqtt/events/events.go as mqttPrefix + ".client_publish" and is correctly used in the Publish method. This follows the consistent pattern used across all modules in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for the clientPublish constant definition
rg -A 1 'const.*clientPublish'

Length of output: 32


Script:

#!/bin/bash
# Try different patterns to find clientPublish
echo "=== Searching for clientPublish occurrences ==="
rg "clientPublish" -A 2 -B 2

echo -e "\n=== Searching for type declarations that might contain clientPublish ==="
rg "type.*operation" -A 5 -B 1

Length of output: 2671


57-65: Verify the clientDisconnect constant.

The implementation looks good, but let's verify the definition of the clientDisconnect constant to ensure it matches the expected operation type.

✅ Verification successful

The clientDisconnect constant is properly defined and used correctly

The constant is defined in mqtt/events/events.go as clientDisconnect = mqttPrefix + ".client_disconnect" and follows the same pattern as other MQTT operation constants in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for the clientDisconnect constant definition
rg -A 1 'const.*clientDisconnect'

Length of output: 35


Script:

#!/bin/bash
# Try different patterns to locate the constant
echo "=== Searching for clientDisconnect usage ==="
rg "clientDisconnect"

echo -e "\n=== Searching for operation type definitions ==="
rg "type.+operation"

echo -e "\n=== Looking for operation-related constants ==="
rg "operation\s+=|operation:"

Length of output: 2122


45-53: Verify the clientConnect constant.

The implementation looks good, but let's verify the definition of the clientConnect constant to ensure it matches the expected operation type.

✅ Verification successful

✓ The clientConnect constant is properly defined and used correctly

The constant is defined in mqtt/events/events.go as clientConnect = mqttPrefix + ".client_connect", following the consistent pattern used for other MQTT events. Its usage in the Connect method is appropriate for MQTT connection events.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for the clientConnect constant definition
rg -A 1 'const.*clientConnect'

Length of output: 32


Script:

#!/bin/bash
# Search for any occurrence of clientConnect
rg -B 2 -A 2 'clientConnect'

# Search for operation-related type definitions or constants
rg -B 2 -A 2 'type.*operation'

# Check the specific file for related constants
cat mqtt/events/streams.go

Length of output: 4163


96-106: Verify the clientUnsubscribe constant.

The implementation looks good, but let's verify the definition of the clientUnsubscribe constant to ensure it matches the expected operation type.

✅ Verification successful

The clientUnsubscribe constant is correctly defined and used.

The constant is properly defined in mqtt/events/events.go as clientUnsubscribe = mqttPrefix + ".client_unsubscribe" and is being used correctly in the Unsubscribe method.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for the clientUnsubscribe constant definition
rg -A 1 'const.*clientUnsubscribe'

Length of output: 36


Script:

#!/bin/bash
# Try different patterns to locate clientUnsubscribe
echo "=== Searching for any occurrence of clientUnsubscribe ==="
rg "clientUnsubscribe" -A 2 -B 2

echo "=== Searching for type definitions that might contain this value ==="
rg "type.*(operation|Operation)" -A 5 -B 1

echo "=== Looking for variable declarations ==="
ast-grep --pattern "clientUnsubscribe = $_"

Length of output: 5542


82-93: Verify the clientSubscribe constant.

The implementation looks good, but let's verify the definition of the clientSubscribe constant to ensure it matches the expected operation type.

✅ Verification successful

The clientSubscribe constant is properly defined and used.

The constant is correctly defined as "mqtt.client_subscribe" and follows the consistent pattern used for all MQTT operation constants in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for the clientSubscribe constant definition
rg -A 1 'const.*clientSubscribe'

Length of output: 34


Script:

#!/bin/bash
# Try different patterns to find the constant
echo "=== Searching for operation type definitions ==="
rg -g '*.go' 'type.*Operation'

echo -e "\n=== Searching for subscribe-related constants ==="
rg -g '*.go' '(const|var).*(subscribe|Subscribe)'

echo -e "\n=== Searching for any occurrence of subscribe in the same file ==="
rg -g 'mqtt/events/streams.go' 'subscribe|Subscribe'

Length of output: 2033


Script:

#!/bin/bash
# Search for operation-related constants in mqtt/events
echo "=== Searching for operation constants in mqtt/events ==="
rg -g 'mqtt/events/*.go' '(const|var|type).*(operation|Operation)'

echo -e "\n=== Checking content of mqtt/events directory ==="
fd . mqtt/events/ -e go -x cat {}

Length of output: 4291

journal/postgres/telemetry.go (1)

187-207: Performance optimization needed for subscription matching.

The current implementation using LIKE with wildcards can be slow on large datasets. Consider using a more efficient matching strategy.

go.mod (1)

11-11: Verify compatibility with pre-release version of authzed-go.

The update to authzed-go v1.2.2-0.20250107172318-7fd4159ab2b7 includes a pre-release version. Ensure this version is stable and compatible with your use case.

Also applies to: 13-13

✅ Verification successful

Pre-release version usage is intentional and consistent with experimental APIs

The codebase consistently uses ClientWithExperimental and NewClientWithExperimentalAPIs across all services, indicating that the pre-release version is specifically chosen to support these experimental API features. The implementation follows documented patterns and shows no signs of compatibility issues.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for any breaking changes in authzed-go usage
rg -l 'github.com/authzed/authzed-go' --type go

Length of output: 260


Script:

#!/bin/bash
# Check specific imports and API usage from authzed-go
echo "=== Checking imports ==="
rg '^import.*github.com/authzed/authzed-go' -A 5 --type go

echo -e "\n=== Checking API usage patterns ==="
rg 'authzed\.' -A 2 --type go

echo -e "\n=== Checking specific client initialization ==="
ast-grep --pattern 'authzed.NewClient($$$)'

Length of output: 3992

cmd/http/main.go (1)

146-151: Add cleanup for event store resources.

The event store is initialized but missing cleanup code. Add a defer statement to ensure proper resource cleanup.

mqtt/handler.go (1)

56-58: LGTM: Error variables are well-defined and follow consistent naming.

The new error variables for event-related failures are descriptive and follow the established pattern.

ws/handler.go (1)

52-54: LGTM: Error variables follow consistent pattern.

The error variables are well-defined and match the pattern used in mqtt handler.

http/handler_test.go (2)

69-70: LGTM: Clean mock initialization.

The event store mock is properly initialized and follows the established pattern for other mocks.

Also applies to: 78-81


335-348: Good test coverage for event store failures.

The test case properly verifies error handling when event store publish fails.

docker/docker-compose.yml (2)

Line range hint 178-182: LGTM: Redis configuration follows best practices.

The Redis service is properly configured with volume persistence and network settings.


1064-1064: Ensure consistent ES_URL configuration.

The SMQ_ES_URL environment variable is consistently added to all adapter services.

Also applies to: 1151-1151, 1229-1229

coap/adapter.go (3)

29-36: LGTM! Interface changes improve client identification.

The change from key to clientID in method signatures provides better clarity and aligns with telemetry tracking requirements.


Line range hint 104-124: LGTM! Proper error handling in Unsubscribe.

The error handling and authorization checks are implemented correctly.


Line range hint 57-74: Verify authorization checks for publish operations.

The authorization logic looks correct, but let's verify all publish operations are properly authorized.

✅ Verification successful

Authorization checks for publish operations are properly implemented

The CoAP adapter correctly implements authorization by:

  • Verifying client permissions through channels.Authorize before publishing
  • Using appropriate client type and operation type parameters
  • Properly handling authorization failures
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any publish operations that bypass authorization

# Search for Publish method calls without prior authorization
ast-grep --pattern 'Publish($$$) {
  $$$
  if !authzRes.Authorized {
    $$$
  }
  $$$
}'

Length of output: 91


Script:

#!/bin/bash
# Search for Publish method implementations and their authorization patterns

# Find all Publish method implementations
echo "=== Publish Methods ==="
ast-grep --pattern 'func $_($_ context.Context, $$$) $_ {
  $$$
}'

# Search for authorization-related patterns
echo -e "\n=== Authorization Patterns ==="
rg -n "Authorize|Authorization" --type go

# Search specifically for publish operations
echo -e "\n=== Publish Operations ==="
rg -n "\.Publish\(" --type go

Length of output: 66316


Script:

#!/bin/bash
# First find files containing Publish methods
echo "=== Files with Publish methods ==="
rg --files-with-matches "func.*Publish"

# Then look for authorization patterns in publish operations
echo -e "\n=== Authorization in Publish methods ==="
rg -A 5 "func.*Publish" --type go

Length of output: 33343

coap/api/transport.go (1)

227-239: LGTM! Clean authentication implementation.

The authentication function properly:

  • Handles gRPC client errors
  • Wraps errors with domain-specific types
  • Returns client ID for successful authentication
journal/mocks/repository.go (1)

Line range hint 19-125: LGTM! Comprehensive mock implementations for telemetry tracking.

The mock implementations properly support:

  • Subscription management
  • Message counting
  • Error handling

http/handler.go Outdated
Comment on lines 102 to 104
if err := h.es.Connect(ctx, tok); err != nil {
return errors.Wrap(errFailedPublishEvent, err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mismatch with Connect signature from mocks.
The es.Connect in mocks requires three parameters: (ctx, clientID, connID). Here, only two are supplied (ctx, tok). Decide whether tok is truly the connID or if you need a distinct clientID parameter.

-if err := h.es.Connect(ctx, tok); err != nil {
+if err := h.es.Connect(ctx, clientID, tok); err != nil {

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 183 to 184
// AddSubscription adds a subscription to the client telemetry.
AddSubscription(ctx context.Context, clientID, sub string) error
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Empty parameter validation is indeed missing and should be added

The implementation in journal/postgres/telemetry.go directly uses the parameters in the SQL query without any validation. Neither the service layer nor the repository layer validates these parameters, which could lead to data quality issues. Consider adding validation at the repository layer:

if clientID == "" {
    return fmt.Errorf("clientID cannot be empty")
}
if sub == "" {
    return fmt.Errorf("subscription cannot be empty")
}
🔗 Analysis chain

Consider adding validation for empty parameters.

The AddSubscription method should validate that neither clientID nor sub are empty strings to prevent invalid data in the database.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if parameter validation exists in the implementation
rg -A 5 'AddSubscription.*\{' --type go

Length of output: 858


Script:

#!/bin/bash
# Check database schema for constraints and similar methods with validation
rg -g '*.sql' 'CREATE TABLE.*clients_telemetry' -A 10
rg 'func.*\(.*string\).*error.*\{.*if.*==""' --type go
rg 'AddSubscription.*\(' -B 5 --type go

Length of output: 1747


Script:

#!/bin/bash
# Check toAdapterEvent implementation and related validation
ast-grep --pattern 'func toAdapterEvent($$$) {
  $$$
}'
rg 'type.*AdapterEvent' -A 5 --type go

Length of output: 98

Comment on lines 83 to 98
func (repo *repository) AddSubscription(ctx context.Context, clientID, sub string) error {
q := `
UPDATE clients_telemetry
SET subscriptions = ARRAY_APPEND(subscriptions, :subscriptions),
last_seen = :last_seen
WHERE client_id = :client_id;
`
ct := journal.ClientTelemetry{
ClientID: clientID,
Subscriptions: []string{sub},
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}

result, err := repo.db.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add transaction support for subscription management.

The AddSubscription method should use a transaction to ensure atomicity when updating both subscriptions and last_seen timestamp.

 func (repo *repository) AddSubscription(ctx context.Context, clientID, sub string) error {
+    tx, err := repo.db.BeginTxx(ctx, nil)
+    if err != nil {
+        return errors.Wrap(repoerr.ErrUpdateEntity, err)
+    }
+    defer tx.Rollback()
+
     q := `
         UPDATE clients_telemetry
         SET subscriptions = ARRAY_APPEND(subscriptions, :subscriptions),
             last_seen = :last_seen
         WHERE client_id = :client_id;
     `
     // ... rest of the code ...
-    result, err := repo.db.NamedExecContext(ctx, q, dbct)
+    result, err := tx.NamedExecContext(ctx, q, dbct)
     if err != nil {
         return postgres.HandleError(repoerr.ErrUpdateEntity, err)
     }
+    return tx.Commit()
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (repo *repository) AddSubscription(ctx context.Context, clientID, sub string) error {
q := `
UPDATE clients_telemetry
SET subscriptions = ARRAY_APPEND(subscriptions, :subscriptions),
last_seen = :last_seen
WHERE client_id = :client_id;
`
ct := journal.ClientTelemetry{
ClientID: clientID,
Subscriptions: []string{sub},
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
result, err := repo.db.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
return nil
}
func (repo *repository) AddSubscription(ctx context.Context, clientID, sub string) error {
tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
defer tx.Rollback()
q := `
UPDATE clients_telemetry
SET subscriptions = ARRAY_APPEND(subscriptions, :subscriptions),
last_seen = :last_seen
WHERE client_id = :client_id;
`
ct := journal.ClientTelemetry{
ClientID: clientID,
Subscriptions: []string{sub},
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
result, err := tx.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
if err := tx.Commit(); err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
return nil
}

Comment on lines 229 to 230
eventsCall := es.On("Connect", mock.Anything, mock.Anything).Return(nil)
eventsCall1 := es.On("Publish", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add assertions for event store method parameters.

The mock expectations for Connect and Publish should include parameter assertions to ensure they are called with the correct arguments.

-eventsCall := es.On("Connect", mock.Anything, mock.Anything).Return(nil)
+eventsCall := es.On("Connect", mock.Anything, mock.MatchedBy(func(cfg events.Config) bool {
+    return cfg.InstanceID != "" && cfg.URL != ""
+})).Return(nil)

-eventsCall1 := es.On("Publish", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
+eventsCall1 := es.On("Publish", mock.Anything, 
+    mock.MatchedBy(func(eventType string) bool { return eventType != "" }),
+    mock.MatchedBy(func(clientID string) bool { return clientID != "" }),
+    mock.MatchedBy(func(data interface{}) bool { return data != nil }),
+).Return(nil)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
eventsCall := es.On("Connect", mock.Anything, mock.Anything).Return(nil)
eventsCall1 := es.On("Publish", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
eventsCall := es.On("Connect", mock.Anything, mock.MatchedBy(func(cfg events.Config) bool {
return cfg.InstanceID != "" && cfg.URL != ""
})).Return(nil)
eventsCall1 := es.On("Publish", mock.Anything,
mock.MatchedBy(func(eventType string) bool { return eventType != "" }),
mock.MatchedBy(func(clientID string) bool { return clientID != "" }),
mock.MatchedBy(func(data interface{}) bool { return data != nil }),
).Return(nil)

mqtt/handler.go Outdated
Comment on lines 204 to 217

for _, topic := range *topics {
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
return errors.Wrap(ErrFailedSubscribeEvent, err)
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in Subscribe method.

The method should validate topics before processing:

  1. Add nil check for topics parameter
  2. Consider adding length validation for topics

Apply these improvements:

 func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
     s, ok := session.FromContext(ctx)
     if !ok {
         return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized)
     }
+    if topics == nil || *topics == nil {
+        return errors.Wrap(ErrFailedSubscribe, ErrMissingTopicSub)
+    }
+    if len(*topics) == 0 {
+        return nil
+    }

Committable suggestion skipped: line range outside the PR's diff.

mqtt/handler.go Outdated
Comment on lines 174 to 176
chanID, subtopic, err := parseTopic(*topic)
if err != nil {
return errors.Wrap(ErrFailedParseSubtopic, err)
return err
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential nil pointer dereference in parseTopic.

The function is called with *topic without checking if topic is nil, which could lead to a panic.

Add a nil check before dereferencing:

+    if topic == nil {
+        return "", "", errors.Wrap(ErrFailedPublish, ErrMissingTopicPub)
+    }
     chanID, subtopic, err := parseTopic(*topic)

Committable suggestion skipped: line range outside the PR's diff.

mqtt/handler.go Outdated
Comment on lines 283 to 308
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
}

chanID := channelParts[1]
subtopic := channelParts[2]

subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(ErrFailedParseSubtopic, err)
}

return chanID, subtopic, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling in parseTopic.

The function needs additional validation:

  1. The length check should be < 3 since we access index 2
  2. Consider adding bounds checking for the channel ID length

Apply these improvements:

 func parseTopic(topic string) (string, string, error) {
     channelParts := channelRegExp.FindStringSubmatch(topic)
-    if len(channelParts) < 2 {
+    if len(channelParts) < 3 {
         return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
     }
 
     chanID := channelParts[1]
+    if len(chanID) == 0 {
+        return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
+    }
     subtopic := channelParts[2]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
}
chanID := channelParts[1]
subtopic := channelParts[2]
subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(ErrFailedParseSubtopic, err)
}
return chanID, subtopic, nil
}
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 3 {
return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
}
chanID := channelParts[1]
if len(chanID) == 0 {
return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
}
subtopic := channelParts[2]
subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(ErrFailedParseSubtopic, err)
}
return chanID, subtopic, nil
}

mqtt/handler_test.go Outdated Show resolved Hide resolved
dependabot bot and others added 11 commits January 28, 2025 12:07
…318-7fd4159ab2b7 to 1.3.0 (absmach#2668)

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Felix Gateru <[email protected]>
Signed-off-by: Felix Gateru <[email protected]>
Signed-off-by: Felix Gateru <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

♻️ Duplicate comments (3)
cmd/http/main.go (1)

169-175: ⚠️ Potential issue

Add cleanup code for event store middleware.

The event store middleware is initialized but there's no cleanup code in the defer block.

mqtt/handler.go (1)

207-217: 🛠️ Refactor suggestion

Improve error handling in Subscribe method.

The error handling in the Subscribe method needs improvement:

  1. Missing nil check for topics parameter
  2. Missing length validation for topics
journal/postgres/telemetry.go (1)

83-98: 🛠️ Refactor suggestion

Add transaction support for subscription management.

The AddSubscription method should use a transaction to ensure atomicity.

Apply this diff:

 func (repo *repository) AddSubscription(ctx context.Context, sub journal.ClientSubscription) error {
+    tx, err := repo.db.BeginTxx(ctx, nil)
+    if err != nil {
+        return postgres.HandleError(repoerr.ErrUpdateEntity, err)
+    }
+    defer tx.Rollback()
+
     q := `INSERT INTO subscriptions (id, subscriber_id, channel_id, subtopic, client_id)
         VALUES (:id, :subscriber_id, :channel_id, :subtopic, :client_id);
     `

-    result, err := repo.db.NamedExecContext(ctx, q, sub)
+    result, err := tx.NamedExecContext(ctx, q, sub)
     if err != nil {
         return postgres.HandleError(repoerr.ErrUpdateEntity, err)
     }

     if rows, _ := result.RowsAffected(); rows == 0 {
         return repoerr.ErrNotFound
     }

-    return nil
+    return tx.Commit()
 }
🧹 Nitpick comments (4)
mqtt/handler_test.go (1)

Line range hint 1-543: Add concurrent test cases for telemetry operations.

Since telemetry aggregation involves tracking multiple client operations, consider adding test cases that verify thread safety and correct behavior under concurrent scenarios.

Example test scenario:

func TestConcurrentSubscribe(t *testing.T) {
    handler := newHandler()
    const numGoroutines = 10
    var wg sync.WaitGroup
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            ctx := session.NewContext(context.TODO(), &sessionClient)
            err := handler.Subscribe(ctx, &topics)
            assert.NoError(t, err)
        }(i)
    }
    wg.Wait()
}
pkg/messaging/events/publisher.go (1)

33-45: Consider adding context cancellation check

The Publish method should check for context cancellation before proceeding with operations.

 func (es *publisherES) Publish(ctx context.Context, topic string, msg *messaging.Message) error {
+    if ctx.Err() != nil {
+        return ctx.Err()
+    }
     if err := es.pub.Publish(ctx, topic, msg); err != nil {
         return err
     }
pkg/messaging/events/events.go (1)

20-24: Consider making struct fields exportable

The publishEvent struct fields are unexported, which might limit their usage in external packages.

 type publishEvent struct {
-    channelID string
-    clientID  string
-    subtopic  string
+    ChannelID string
+    ClientID  string
+    Subtopic  string
 }
journal/postgres/init.go (1)

38-44: Consider adding indexes for query performance.

The new subscriptions table will be queried by subscriber_id, channel_id, and client_id. Consider adding indexes to improve query performance.

Add these indexes to the migration:

 CREATE TABLE IF NOT EXISTS subscriptions (
   id              VARCHAR(36) PRIMARY KEY,
   subscriber_id   VARCHAR(1024) NOT NULL,
   channel_id      VARCHAR(36) NOT NULL,
   subtopic        VARCHAR(1024),
   client_id       VARCHAR(36),
   FOREIGN KEY (client_id) REFERENCES clients_telemetry(client_id) ON DELETE CASCADE ON UPDATE CASCADE
-);
+);
+CREATE INDEX idx_subscriptions_subscriber ON subscriptions(subscriber_id);
+CREATE INDEX idx_subscriptions_channel ON subscriptions(channel_id);
+CREATE INDEX idx_subscriptions_client ON subscriptions(client_id);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 87413b4 and 0d127da.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (24)
  • cmd/coap/main.go (3 hunks)
  • cmd/http/main.go (3 hunks)
  • cmd/mqtt/main.go (3 hunks)
  • cmd/ws/main.go (3 hunks)
  • coap/adapter.go (1 hunks)
  • docker/docker-compose.yml (3 hunks)
  • go.mod (4 hunks)
  • journal/api/responses.go (2 hunks)
  • journal/journal.go (2 hunks)
  • journal/middleware/authorization.go (1 hunks)
  • journal/mocks/repository.go (2 hunks)
  • journal/postgres/init.go (1 hunks)
  • journal/postgres/telemetry.go (4 hunks)
  • journal/service.go (3 hunks)
  • mqtt/events/events.go (1 hunks)
  • mqtt/events/streams.go (2 hunks)
  • mqtt/handler.go (5 hunks)
  • mqtt/handler_test.go (3 hunks)
  • mqtt/mocks/events.go (1 hunks)
  • pkg/messaging/events/events.go (1 hunks)
  • pkg/messaging/events/publisher.go (1 hunks)
  • pkg/messaging/events/pubsub.go (1 hunks)
  • pkg/messaging/pubsub.go (1 hunks)
  • ws/adapter.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • cmd/ws/main.go
  • journal/middleware/authorization.go
  • journal/api/responses.go
  • mqtt/mocks/events.go
  • go.mod
  • mqtt/events/streams.go
🧰 Additional context used
🪛 GitHub Check: api-test
cmd/coap/main.go

[failure] 52-52:
ESURL redeclared


[failure] 51-51:
other declaration of ESURL

cmd/http/main.go

[failure] 64-64:
ESURL redeclared


[failure] 63-63:
other declaration of ESURL

🪛 GitHub Check: Lint and Build
cmd/coap/main.go

[failure] 52-52:
ESURL redeclared


[failure] 51-51:
other declaration of ESURL (typecheck)

mqtt/handler.go

[failure] 56-56:
ErrFailedSubscribeEvent redeclared in this block (typecheck)


[failure] 55-55:
other declaration of ErrFailedSubscribeEvent (typecheck)

mqtt/events/events.go

[failure] 45-45:
syntax error: unexpected { at end of statement (typecheck)


[failure] 45-45:
expected ';', found '{' (typecheck)


[failure] 45-45:
syntax error: unexpected { at end of statement) (typecheck)

cmd/http/main.go

[failure] 64-64:
ESURL redeclared


[failure] 63-63:
other declaration of ESURL (typecheck)

🪛 GitHub Actions: CI Pipeline
cmd/coap/main.go

[error] 52-52: ESURL redeclared

🪛 golangci-lint (1.62.2)
mqtt/handler.go

56-56: ErrFailedSubscribeEvent redeclared in this block

(typecheck)


55-55: other declaration of ErrFailedSubscribeEvent

(typecheck)


293-293: parseTopic redeclared in this block

(typecheck)


276-276: other declaration of parseTopic

(typecheck)

mqtt/events/events.go

45-45: expected ';', found '{'

(typecheck)

🪛 GitHub Actions: Property Based Tests
cmd/http/main.go

[error] 64-64: ESURL redeclared

🔇 Additional comments (17)
mqtt/handler_test.go (1)

448-452: Consider adding test cases for event store failures.

The test only verifies successful event store operations. Add test cases for failure scenarios.

pkg/messaging/events/pubsub.go (1)

64-76: ⚠️ Potential issue

Add missing clientID in unsubscribe event

The unsubscribe event is missing the clientID field which is present in the subscribe event.

 func (es *pubsubES) Unsubscribe(ctx context.Context, id string, topic string) error {
     if err := es.pubsub.Unsubscribe(ctx, id, topic); err != nil {
         return err
     }
 
     se := subscribeEvent{
         operation:    clientUnsubscribe,
         subscriberID: id,
+        clientID:     "", // Need to pass clientID parameter
         topic:        topic,
     }

Likely invalid or redundant comment.

pkg/messaging/pubsub.go (1)

39-39: Verify backward compatibility

The addition of ClientID field might affect existing implementations of the SubscriberConfig struct.

✅ Verification successful

✓ Change is backward compatible

The addition of the ClientID field to SubscriberConfig is safe as it maintains backward compatibility with existing code. All current usages of the struct will continue to work without modification, as unspecified fields in Go structs are automatically zero-valued.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing SubscriberConfig usage to ensure compatibility
echo "Searching for SubscriberConfig usage..."
rg "SubscriberConfig{" -A 5

# Search for existing Subscribe method calls
echo "Searching for Subscribe method calls..."
rg "Subscribe\(ctx,.*SubscriberConfig{" -A 5

Length of output: 484


Script:

#!/bin/bash
# Check if this is a new file/package
echo "Checking file history..."
git log --diff-filter=A -- pkg/messaging/pubsub.go

# Search for SubscriberConfig usage with proper regex
echo -e "\nSearching for SubscriberConfig usage..."
rg "SubscriberConfig\{" --type go

# Search for SubscriberConfig type definition
echo -e "\nSearching for SubscriberConfig definition..."
rg "type SubscriberConfig struct" --type go -A 10

# Search for Subscribe method calls
echo -e "\nSearching for Subscribe method calls..."
rg "Subscribe\(.+SubscriberConfig" --type go

Length of output: 5006

ws/adapter.go (1)

78-81: LGTM! The SubscriberConfig initialization looks good.

The addition of the ClientID field enhances telemetry tracking capabilities while maintaining backward compatibility.

cmd/mqtt/main.go (2)

35-35: LGTM! Import of msgevents package.

The import aligns with the telemetry aggregation requirements.


146-151: LGTM! Event store middleware initialization.

The middleware is properly initialized with comprehensive error handling for both publishers.

Also applies to: 170-175

mqtt/events/events.go (2)

8-13: LGTM! Well-structured event constants.

The constants follow good naming conventions and provide clear event categorization.


20-34: LGTM! Well-designed connectEvent structure.

The connectEvent struct and its Encode method are properly implemented with all necessary fields for telemetry tracking.

cmd/coap/main.go (1)

149-154: LGTM! Event store middleware initialization.

The PubSub middleware is properly initialized with comprehensive error handling.

coap/adapter.go (1)

Line range hint 1-24: Verify the impact of authentication changes.

The removal of client authentication in favor of direct clientID usage could have security implications. Ensure that the clientID is properly validated upstream before reaching these methods.

Run this script to verify the authentication flow:

✅ Verification successful

Authentication flow is properly secured

The removal of client authentication in favor of direct clientID usage is safe because:

  • The clientID is obtained only after successful authentication via the gRPC clients service
  • Authentication requires both clientID and clientSecret
  • Multiple validation layers exist including secret validation and caching
  • Authorization checks follow successful authentication
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if clientID validation exists in the authentication chain
rg -A 5 "func.*Authenticate.*clientID" 
ast-grep --pattern 'func $_ ($_ context.Context, clientID string, $$$) {
  $$$
  if clientID == "" {
    $$$
  }
  $$$
}'

Length of output: 164


Script:

#!/bin/bash
# Check clientID usage in CoAP package and gRPC validation
echo "=== Checking CoAP package for clientID usage ==="
rg "clientID" --type go ./coap -B2 -A2

echo -e "\n=== Checking gRPC clients service for validation ==="
rg "func.*Client" --type go ./api/grpc/clients -B2 -A2

echo -e "\n=== Checking error handling for invalid clients ==="
rg "Invalid.*[Cc]lient|[Cc]lient.*Invalid" --type go

Length of output: 16176


Script:

#!/bin/bash
# Check authentication implementation and protobuf definitions
echo "=== Checking authentication implementation ==="
rg -g "!*.pb.go" "func.*Authenticate" --type go -B5 -A10

echo -e "\n=== Checking protobuf definitions ==="
rg -g "*.proto" "message.*Authn|service.*Client" -B2 -A5

echo -e "\n=== Checking clients service implementation ==="
fd -e go -g "**/clients/service.go" -x cat {}

Length of output: 19959

docker/docker-compose.yml (1)

1068-1068: LGTM! Event store configuration is consistent.

The SMQ_ES_URL environment variable is consistently added across all adapter services (http, coap, ws), which aligns with the event store integration changes.

Also applies to: 1155-1155, 1233-1233

journal/postgres/init.go (2)

49-49: LGTM!

The migration script correctly handles dropping the new subscriptions table.


31-37: Verify the impact of making client_id a primary key.

The change from a non-primary key to a primary key for client_id in the clients_telemetry table could affect existing data. Ensure there are no duplicate client_id values in the existing data before applying this migration.

✅ Verification successful

No concerns with client_id as primary key

This appears to be a new table creation with IF NOT EXISTS clause, not a modification of an existing table. The primary key constraint on client_id is part of the initial design and is required for the foreign key relationships that reference it. There's no risk to existing data.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for duplicate client_id values in clients_telemetry table
psql -c "SELECT client_id, COUNT(*) FROM clients_telemetry GROUP BY client_id HAVING COUNT(*) > 1;"

Length of output: 196


Script:

#!/bin/bash
# Check git history for previous schema definition and related files
rg -A 5 -B 5 "CREATE TABLE.*clients_telemetry" --type go
rg "client_id.*clients_telemetry" --type go

Length of output: 1362


Script:

#!/bin/bash
# Check git history for this file to see previous schema
git log -L :clients_telemetry:journal/postgres/init.go

Length of output: 0

journal/mocks/repository.go (1)

19-35: LGTM!

The mock implementations for the new repository methods are well-structured and consistent with the existing pattern. They properly handle return values and include panic conditions for unspecified return values.

Also applies to: 37-63, 83-99, 101-117, 119-135

journal/journal.go (2)

191-204: LGTM!

The new repository interface methods are well-documented and align with the database schema changes.


143-143: Verify the impact of changing the Subscriptions field type.

The Subscriptions field type has changed from []string to uint64. This is a breaking change that could affect existing code that expects an array of subscription strings.

journal/service.go (1)

231-237: ⚠️ Potential issue

Improve error handling in time parsing.

The time parsing code should handle the type assertion more safely.

Apply this diff:

-    createdAtStr := journal.Attributes["created_at"].(string)
-    if createdAtStr != "" {
+    createdAtStr, ok := journal.Attributes["created_at"].(string)
+    if !ok {
+        return clientEvent{}, fmt.Errorf("missing or invalid created_at attribute")
+    }
+    if createdAtStr != "" {
         createdAt, err = time.Parse(time.RFC3339, createdAtStr)
         if err != nil {
             return clientEvent{}, fmt.Errorf("invalid created_at format")
         }
     }

Likely invalid or redundant comment.

mqtt/handler_test.go Outdated Show resolved Hide resolved
Comment on lines +21 to +31
func NewPublisherMiddleware(ctx context.Context, pub messaging.Publisher, url string) (messaging.Publisher, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
if err != nil {
return nil, err
}

return &publisherES{
ep: publisher,
pub: pub,
}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for nil parameters

The constructor should validate that neither pub nor url is nil/empty before proceeding.

 func NewPublisherMiddleware(ctx context.Context, pub messaging.Publisher, url string) (messaging.Publisher, error) {
+    if pub == nil {
+        return nil, errors.New("publisher cannot be nil")
+    }
+    if url == "" {
+        return nil, errors.New("url cannot be empty")
+    }
     publisher, err := store.NewPublisher(ctx, url, streamID)
     if err != nil {
         return nil, err
     }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +47 to +49
func (es *publisherES) Close() error {
return es.pub.Close()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Close method should handle both publishers

The Close method only closes the underlying publisher but not the event publisher.

 func (es *publisherES) Close() error {
-    return es.pub.Close()
+    if err := es.pub.Close(); err != nil {
+        return fmt.Errorf("failed to close publisher: %w", err)
+    }
+    if closer, ok := es.ep.(io.Closer); ok {
+        if err := closer.Close(); err != nil {
+            return fmt.Errorf("failed to close event publisher: %w", err)
+        }
+    }
+    return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (es *publisherES) Close() error {
return es.pub.Close()
}
func (es *publisherES) Close() error {
if err := es.pub.Close(); err != nil {
return fmt.Errorf("failed to close publisher: %w", err)
}
if closer, ok := es.ep.(io.Closer); ok {
if err := closer.Close(); err != nil {
return fmt.Errorf("failed to close event publisher: %w", err)
}
}
return nil
}

Comment on lines +35 to +40
type subscribeEvent struct {
operation string
subscriberID string
clientID string
topic string
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure consistent field naming across event types

The subscribeEvent uses inconsistent field naming compared to publishEvent.

 type subscribeEvent struct {
-    operation    string
-    subscriberID string
-    clientID     string
-    topic        string
+    Operation    string
+    SubscriberID string
+    ClientID     string
+    Topic        string
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type subscribeEvent struct {
operation string
subscriberID string
clientID string
topic string
}
type subscribeEvent struct {
Operation string
SubscriberID string
ClientID string
Topic string
}

@@ -36,6 +36,7 @@ type MessageHandler interface {

type SubscriberConfig struct {
ID string
ClientID string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add documentation for ClientID field

The new ClientID field lacks documentation explaining its purpose and requirements.

 type SubscriberConfig struct {
     ID             string
-    ClientID       string
+    // ClientID uniquely identifies the client in the messaging system.
+    // This is used for telemetry tracking and client-specific operations.
+    ClientID       string
     Topic          string
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ClientID string
// ClientID uniquely identifies the client in the messaging system.
// This is used for telemetry tracking and client-specific operations.
ClientID string

mqtt/handler.go Outdated Show resolved Hide resolved
mqtt/handler.go Outdated Show resolved Hide resolved
Comment on lines +150 to +156
type ClientSubscription struct {
ID string `json:"id" db:"id"`
SubscriberID string `json:"subscriber_id" db:"subscriber_id"`
ChannelID string `json:"channel_id" db:"channel_id"`
Subtopic string `json:"subtopic" db:"subtopic"`
ClientID string `json:"client_id" db:"client_id"`
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding validation for ClientSubscription fields.

The ClientSubscription struct should include field validation to ensure data integrity.

Add validation methods:

func (cs *ClientSubscription) Validate() error {
    if cs.ID == "" {
        return errors.New("id cannot be empty")
    }
    if cs.SubscriberID == "" {
        return errors.New("subscriber_id cannot be empty")
    }
    if cs.ChannelID == "" {
        return errors.New("channel_id cannot be empty")
    }
    return nil
}

Comment on lines 159 to 212
func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
query := `
SELECT client_id, COUNT(*) AS match_count
FROM subscriptions
WHERE channel_id = :channel_id AND subtopic = :subtopic
GROUP BY client_id
`
sb := journal.ClientSubscription{
ChannelID: channelID,
Subtopic: subtopic,
}

rows, err := repo.db.NamedQueryContext(ctx, query, sb)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer rows.Close()

tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

q := `UPDATE clients_telemetry
SET outbound_messages = outbound_messages + $1
WHERE client_id = $2;
`

for rows.Next() {
var clientID string
var count uint64
if err = rows.Scan(&clientID, &count); err != nil {
err := tx.Rollback()
if err == nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return errors.Wrap(errors.ErrRollbackTx, err)
}

if _, err = repo.db.ExecContext(ctx, q, count, clientID); err != nil {
err := tx.Rollback()
if err == nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return errors.Wrap(errors.ErrRollbackTx, err)
}
}

if err = tx.Commit(); err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling in IncrementOutboundMessages.

The error handling in the transaction rollback needs improvement. The condition is reversed, and the error wrapping could be clearer.

Apply this diff:

-            err := tx.Rollback()
-            if err == nil {
-                return postgres.HandleError(repoerr.ErrUpdateEntity, err)
+            if err := tx.Rollback(); err != nil {
+                return errors.Wrap(errors.ErrRollbackTx, err)
             }
-            return errors.Wrap(errors.ErrRollbackTx, err)
+            return postgres.HandleError(repoerr.ErrUpdateEntity, err)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
query := `
SELECT client_id, COUNT(*) AS match_count
FROM subscriptions
WHERE channel_id = :channel_id AND subtopic = :subtopic
GROUP BY client_id
`
sb := journal.ClientSubscription{
ChannelID: channelID,
Subtopic: subtopic,
}
rows, err := repo.db.NamedQueryContext(ctx, query, sb)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer rows.Close()
tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
q := `UPDATE clients_telemetry
SET outbound_messages = outbound_messages + $1
WHERE client_id = $2;
`
for rows.Next() {
var clientID string
var count uint64
if err = rows.Scan(&clientID, &count); err != nil {
err := tx.Rollback()
if err == nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return errors.Wrap(errors.ErrRollbackTx, err)
}
if _, err = repo.db.ExecContext(ctx, q, count, clientID); err != nil {
err := tx.Rollback()
if err == nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return errors.Wrap(errors.ErrRollbackTx, err)
}
}
if err = tx.Commit(); err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return nil
}
func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
query := `
SELECT client_id, COUNT(*) AS match_count
FROM subscriptions
WHERE channel_id = :channel_id AND subtopic = :subtopic
GROUP BY client_id
`
sb := journal.ClientSubscription{
ChannelID: channelID,
Subtopic: subtopic,
}
rows, err := repo.db.NamedQueryContext(ctx, query, sb)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer rows.Close()
tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
q := `UPDATE clients_telemetry
SET outbound_messages = outbound_messages + $1
WHERE client_id = $2;
`
for rows.Next() {
var clientID string
var count uint64
if err = rows.Scan(&clientID, &count); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if _, err = repo.db.ExecContext(ctx, q, count, clientID); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
}
if err = tx.Commit(); err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return nil
}

Comment on lines +138 to +141
topics := strings.Split(ae.topic, ".")
if len(topics) > 2 {
subtopic = topics[2]
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for topic format.

The topic parsing assumes a specific format without validation.

Add validation:

     topics := strings.Split(ae.topic, ".")
+    if len(topics) < 2 {
+        return fmt.Errorf("invalid topic format: expected at least 2 parts, got %d", len(topics))
+    }
     if len(topics) > 2 {
         subtopic = topics[2]
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
topics := strings.Split(ae.topic, ".")
if len(topics) > 2 {
subtopic = topics[2]
}
topics := strings.Split(ae.topic, ".")
if len(topics) < 2 {
return fmt.Errorf("invalid topic format: expected at least 2 parts, got %d", len(topics))
}
if len(topics) > 2 {
subtopic = topics[2]
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
cmd/coap/main.go (1)

148-154: Enhance middleware setup with observability and cleanup.

Consider the following improvements:

  1. Add graceful shutdown handling for the event store middleware
  2. Add specific error handling (e.g., connection failures vs. configuration errors)
  3. Integrate with the existing metrics and tracing setup

Example enhancement:

 nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
 if err != nil {
-    logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
+    logger.Error(fmt.Sprintf("failed to create event store middleware: %s (url: %s)", err, cfg.ESURL))
     exitCode = 1
     return
 }
+// Add metrics
+nps = brokerstracing.NewPubSub(coapServerConfig, tracer, nps)
+
+// Register cleanup
+defer func() {
+    if closer, ok := nps.(io.Closer); ok {
+        if err := closer.Close(); err != nil {
+            logger.Error(fmt.Sprintf("failed to close event store middleware: %s", err))
+        }
+    }
+}()
mqtt/handler.go (1)

238-240: Consider logging the error before returning.

For better observability, consider logging the error before returning it, similar to how it's done in the Connect method.

Apply this diff:

     if err := h.es.Disconnect(ctx, s.Username, s.ID); err != nil {
+        h.logger.Error(errors.Wrap(ErrFailedPublishDisconnectEvent, err).Error())
         return errors.Wrap(ErrFailedPublishDisconnectEvent, err)
     }
mqtt/handler_test.go (3)

150-153: Add test cases for event store failures.

The test only verifies successful event store operations. Consider adding test cases where the event store returns an error.

Add a new test case:

     {
+        desc:    "connect with event store failure",
+        session: &sessionClient,
+        authNRes: &grpcClientsV1.AuthnRes{
+            Authenticated: true,
+            Id:            clientID,
+        },
+        eventStoreErr: errors.New("event store error"),
     },

448-452: Add test cases for Subscribe event store failures.

The test should verify the behavior when the event store fails to record the subscription.

Add a new test case:

     {
+        desc:    "subscribe with event store failure",
+        session: &sessionClient,
+        topic:   topics,
+        logMsg:  fmt.Sprintf(mqtt.LogInfoSubscribed, clientID, topics[0]),
+        eventStoreErr: errors.New("event store error"),
+        err:     mqtt.ErrFailedSubscribeEvent,
     },

Line range hint 522-526: Add test cases for Disconnect event store failures.

The test should verify the behavior when the event store fails to record the disconnection.

Add a new test case:

     {
+        desc:    "disconnect with event store failure",
+        session: &sessionClient,
+        topic:   topics,
+        eventStoreErr: errors.New("event store error"),
+        err:     mqtt.ErrFailedPublishDisconnectEvent,
     },
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0d127da and a7e9e5d.

📒 Files selected for processing (5)
  • cmd/coap/main.go (3 hunks)
  • cmd/http/main.go (3 hunks)
  • mqtt/events/events.go (1 hunks)
  • mqtt/handler.go (5 hunks)
  • mqtt/handler_test.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cmd/http/main.go
  • mqtt/events/events.go
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Lint and Build
  • GitHub Check: api-test
🔇 Additional comments (6)
cmd/coap/main.go (2)

24-24: LGTM!

The import for the messaging events package is correctly added and follows Go conventions.


51-51: Consider separating event store and message broker URLs.

The event store (ESURL) defaults to the same NATS URL as the message broker. While this might work, it could be better to:

  1. Use a different default URL to avoid potential confusion
  2. Add validation for the URL format
  3. Add documentation explaining the purpose of this configuration

Let's check if the event store URL is documented elsewhere:

mqtt/handler.go (4)

55-55: LGTM!

The new error variable is well-defined and follows the existing error naming convention.


110-112: LGTM!

The error handling for the Connect event is appropriate. The error is logged but doesn't block the connection, which is a good practice for telemetry operations.


275-290: ⚠️ Potential issue

Fix potential nil pointer dereference in parseTopic.

The function is called with topic parameter without checking if it's nil, which could lead to a panic.

Apply this diff:

 func parseTopic(topic string) (string, string, error) {
+    if len(topic) == 0 {
+        return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
+    }
     channelParts := channelRegExp.FindStringSubmatch(topic)
-    if len(channelParts) < 2 {
+    if len(channelParts) < 3 {
         return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
     }
 
     chanID := channelParts[1]
+    if len(chanID) == 0 {
+        return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
+    }
     subtopic := channelParts[2]

Likely invalid or redundant comment.


206-216: 🛠️ Refactor suggestion

Add error handling for nil topics.

The method should validate the topics parameter before processing.

Apply this diff:

 func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
     s, ok := session.FromContext(ctx)
     if !ok {
         return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized)
     }
+    if topics == nil || *topics == nil {
+        return errors.Wrap(ErrFailedSubscribe, ErrMissingTopicSub)
+    }
+    if len(*topics) == 0 {
+        return nil
+    }

Likely invalid or redundant comment.

Signed-off-by: Felix Gateru <[email protected]>
Signed-off-by: Felix Gateru <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
cmd/coap/main.go (1)

148-154: Consider adding documentation and cleanup handling.

Two suggestions for improved maintainability:

  1. Add a comment explaining the purpose of the event store middleware and its role in telemetry aggregation
  2. Consider implementing cleanup handling for the middleware during service shutdown, similar to how nps.Close() is handled
 	nps = brokerstracing.NewPubSub(coapServerConfig, tracer, nps)

+	// Initialize event store middleware for telemetry aggregation
 	nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
 	if err != nil {
 		logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
 		exitCode = 1
 		return
 	}
mqtt/handler.go (2)

110-112: Consider returning the connection event error.

Currently, the connection event error is only logged. Consider whether this error should be returned to properly handle connection tracking failures.

 	if err := h.es.Connect(ctx, s.Username, s.ID); err != nil {
-		h.logger.Error(errors.Wrap(ErrFailedPublishConnectEvent, err).Error())
+		wrappedErr := errors.Wrap(ErrFailedPublishConnectEvent, err)
+		h.logger.Error(wrappedErr.Error())
+		return wrappedErr
 	}

207-215: Optimize loop with early return.

Consider returning early after the first error to avoid unnecessary iterations.

 	for _, topic := range *topics {
 		channelID, subTopic, err := parseTopic(topic)
 		if err != nil {
-			return err
+			return errors.Wrap(ErrFailedSubscribe, err)
 		}
 		if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
-			return errors.Wrap(ErrFailedSubscribeEvent, err)
+			wrappedErr := errors.Wrap(ErrFailedSubscribeEvent, err)
+			h.logger.Error(fmt.Sprintf("Failed to subscribe client %s to channel %s: %v", s.ID, channelID, err))
+			return wrappedErr
 		}
 	}
ws/adapter_test.go (2)

Line range hint 43-156: Enhance test coverage for telemetry tracking.

Consider adding the following test cases to validate the telemetry aggregation requirements:

  1. Verify that subscriptions from the same client ID are properly aggregated
  2. Test concurrent subscriptions from different client IDs
  3. Validate that telemetry data is correctly associated with the respective clients

Example test case structure:

{
    desc:      "aggregate multiple subscriptions from same client",
    clientKey: clientKey,
    chanID:    chanID,
    subtopic:  subTopic,
    authNRes:  &grpcClientsV1.AuthnRes{Id: clientID, Authenticated: true},
    authZRes:  &grpcChannelsV1.AuthzRes{Authorized: true},
    // Add expectations for telemetry tracking
    err:       nil,
},

Line range hint 166-177: Enhance mock verification for telemetry tracking.

The current mock setup focuses on authentication and subscription. Consider extending it to verify:

  1. Telemetry data collection during subscription
  2. Proper propagation of client identification to telemetry systems
  3. Aggregation of subscription metrics

Example mock setup:

// Add telemetry service mock
telemetry := new(mocks.TelemetryService)
telemetryCall := telemetry.On("TrackSubscription", mock.Anything, mock.MatchedBy(func(req interface{}) bool {
    // Verify client ID and subscription details
    return true
})).Return(nil)
defer telemetryCall.Unset()
journal/postgres/telemetry.go (1)

19-20: Add transaction support for atomic operations.

Consider wrapping the insert operation in a transaction to ensure atomicity, especially since this is creating a new record that other concurrent operations might depend on.

 func (repo *repository) SaveClientTelemetry(ctx context.Context, ct journal.ClientTelemetry) error {
+    tx, err := repo.db.BeginTxx(ctx, nil)
+    if err != nil {
+        return postgres.HandleError(repoerr.ErrCreateEntity, err)
+    }
+    defer tx.Rollback()
+
     q := `INSERT INTO clients_telemetry (client_id, domain_id, inbound_messages, outbound_messages, first_seen, last_seen)
         VALUES (:client_id, :domain_id, :inbound_messages, :outbound_messages, :first_seen, :last_seen);`
     // ... rest of the code ...
-    if _, err := repo.db.NamedExecContext(ctx, q, dbct); err != nil {
+    if _, err := tx.NamedExecContext(ctx, q, dbct); err != nil {
         return postgres.HandleError(repoerr.ErrCreateEntity, err)
     }
-    return nil
+    return tx.Commit()
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a7e9e5d and db903c4.

📒 Files selected for processing (7)
  • cmd/coap/main.go (3 hunks)
  • cmd/http/main.go (3 hunks)
  • journal/postgres/telemetry.go (4 hunks)
  • mqtt/events/events.go (1 hunks)
  • mqtt/handler.go (5 hunks)
  • mqtt/handler_test.go (3 hunks)
  • ws/adapter_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • mqtt/handler_test.go
  • cmd/http/main.go
  • mqtt/events/events.go
🔇 Additional comments (10)
cmd/coap/main.go (3)

24-24: LGTM! Import is well-organized.

The new import for messaging events is correctly placed and appropriately aliased.


51-51: LGTM! Config field is well-defined.

The ESURL field is properly configured with appropriate environment variable and default value.


148-154: LGTM! Event store middleware initialization is properly implemented.

The implementation includes proper error handling and context usage.

mqtt/handler.go (3)

55-55: LGTM: Error variable declaration is correct.

The error variable ErrFailedSubscribeEvent is properly declared and follows the error naming convention.


238-241: ⚠️ Potential issue

Fix logging inconsistency in Disconnect.

The log message uses s.Password while the method uses s.Username. This is inconsistent and might expose sensitive information.

-	h.logger.Error(fmt.Sprintf(LogInfoDisconnected, s.ID, s.Password))
+	h.logger.Info(fmt.Sprintf(LogInfoDisconnected, s.ID, s.Username))
 	if err := h.es.Disconnect(ctx, s.Username, s.ID); err != nil {
 		return errors.Wrap(ErrFailedPublishDisconnectEvent, err)
 	}

Likely invalid or redundant comment.


275-290: 🛠️ Refactor suggestion

Improve parseTopic error handling and validation.

The function could benefit from more precise validation:

  1. The length check should be < 3 since we access index 2
  2. The error wrapping should use a more specific error
 func parseTopic(topic string) (string, string, error) {
 	channelParts := channelRegExp.FindStringSubmatch(topic)
-	if len(channelParts) < 2 {
+	if len(channelParts) < 3 {
 		return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
 	}
 
 	chanID := channelParts[1]
+	if len(chanID) == 0 {
+		return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
+	}
 	subtopic := channelParts[2]
 
 	subtopic, err := parseSubtopic(subtopic)
 	if err != nil {
-		return "", "", errors.Wrap(ErrFailedParseSubtopic, err)
+		return "", "", err // parseSubtopic already wraps the error
 	}
 
 	return chanID, subtopic, nil
 }

Likely invalid or redundant comment.

ws/adapter_test.go (1)

161-164: Clarify the distinction between ID and ClientID fields.

Both fields are set to the same clientID value, which suggests potential redundancy. If these fields serve different purposes, consider:

  1. Adding comments to explain their distinct roles
  2. Using different test values to verify their independent behaviors
  3. Adding test cases that specifically validate telemetry tracking based on ClientID

Let's verify the usage of these fields in the implementation:

✅ Verification successful

Both ID and ClientID fields are correctly used for different purposes

The fields serve distinct roles in the WebSocket adapter:

  • ID: Internal identifier assigned after successful authorization
  • ClientID: Configuration field used for subscriber setup and identification during subscription
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for distinct usages of ID and ClientID fields
echo "Searching for ID field usage..."
rg "ID\s*:" -A 3 --type go

echo "Searching for ClientID field usage..."
rg "ClientID\s*:" -A 3 --type go

echo "Searching for telemetry-related code..."
rg "telemetry" --type go

Length of output: 66234


Script:

#!/bin/bash
# Search for WebSocket adapter code and related structs
echo "Searching for WebSocket adapter structs..."
ast-grep --pattern 'type $_ struct {
  $$$
  ID $_
  $$$
}'  ws/

echo "Searching for subscriber related code..."
ast-grep --pattern 'type $_ struct {
  $$$
  ClientID $_
  $$$
}' ws/

echo "Searching for WebSocket adapter implementation..."
rg "type.*Adapter" --type go ws/
rg "func.*Subscribe" -A 10 --type go ws/

Length of output: 3938

journal/postgres/telemetry.go (3)

35-35: LGTM!

The formatting changes improve consistency with the rest of the codebase.

Also applies to: 53-53


213-218: LGTM!

The DB model changes correctly reflect the schema updates and properly handle nullable fields.


115-128: Handle cascade effects in RemoveSubscription.

When removing a subscription, consider updating the associated client's telemetry record to maintain data consistency.

Comment on lines +83 to +98
func (repo *repository) AddSubscription(ctx context.Context, sub journal.ClientSubscription) error {
q := `INSERT INTO subscriptions (id, subscriber_id, channel_id, subtopic, client_id)
VALUES (:id, :subscriber_id, :channel_id, :subtopic, :client_id);
`

result, err := repo.db.NamedExecContext(ctx, q, sub)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add transaction support and index for subscription operations.

The subscription insertion should be wrapped in a transaction. Also, consider adding an index on the subscriber_id column for better query performance.

+-- Add to your migration file:
+CREATE INDEX idx_subscriptions_subscriber_id ON subscriptions(subscriber_id);
+CREATE INDEX idx_subscriptions_client_id ON subscriptions(client_id);
+CREATE INDEX idx_subscriptions_channel_subtopic ON subscriptions(channel_id, subtopic);

 func (repo *repository) AddSubscription(ctx context.Context, sub journal.ClientSubscription) error {
+    tx, err := repo.db.BeginTxx(ctx, nil)
+    if err != nil {
+        return postgres.HandleError(repoerr.ErrUpdateEntity, err)
+    }
+    defer tx.Rollback()
+
     q := `INSERT INTO subscriptions (id, subscriber_id, channel_id, subtopic, client_id)
         VALUES (:id, :subscriber_id, :channel_id, :subtopic, :client_id);`
-    result, err := repo.db.NamedExecContext(ctx, q, sub)
+    result, err := tx.NamedExecContext(ctx, q, sub)
     if err != nil {
         return postgres.HandleError(repoerr.ErrUpdateEntity, err)
     }
     if rows, _ := result.RowsAffected(); rows == 0 {
+        if err := tx.Rollback(); err != nil {
+            return errors.Wrap(errors.ErrRollbackTx, err)
+        }
         return repoerr.ErrNotFound
     }
-    return nil
+    return tx.Commit()
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +130 to +157
func (repo *repository) IncrementInboundMessages(ctx context.Context, clientID string) error {
q := `
UPDATE clients_telemetry
SET inbound_messages = inbound_messages + 1,
last_seen = :last_seen
WHERE client_id = :client_id;
`

ct := journal.ClientTelemetry{
ClientID: clientID,
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}

result, err := repo.db.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add transaction support for IncrementInboundMessages.

Similar to IncrementOutboundMessages, this method should use a transaction to ensure atomicity.

 func (repo *repository) IncrementInboundMessages(ctx context.Context, clientID string) error {
+    tx, err := repo.db.BeginTxx(ctx, nil)
+    if err != nil {
+        return postgres.HandleError(repoerr.ErrUpdateEntity, err)
+    }
+    defer tx.Rollback()
+
     // ... rest of the code ...
-    result, err := repo.db.NamedExecContext(ctx, q, dbct)
+    result, err := tx.NamedExecContext(ctx, q, dbct)
     if err != nil {
+        if err := tx.Rollback(); err != nil {
+            return errors.Wrap(errors.ErrRollbackTx, err)
+        }
         return postgres.HandleError(repoerr.ErrUpdateEntity, err)
     }
     // ... rest of the code ...
-    return nil
+    return tx.Commit()
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (repo *repository) IncrementInboundMessages(ctx context.Context, clientID string) error {
q := `
UPDATE clients_telemetry
SET inbound_messages = inbound_messages + 1,
last_seen = :last_seen
WHERE client_id = :client_id;
`
ct := journal.ClientTelemetry{
ClientID: clientID,
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
result, err := repo.db.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
return nil
}
func (repo *repository) IncrementInboundMessages(ctx context.Context, clientID string) error {
tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer tx.Rollback()
q := `
UPDATE clients_telemetry
SET inbound_messages = inbound_messages + 1,
last_seen = :last_seen
WHERE client_id = :client_id;
`
ct := journal.ClientTelemetry{
ClientID: clientID,
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
result, err := tx.NamedExecContext(ctx, q, dbct)
if err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
return tx.Commit()
}

Comment on lines +159 to +210
func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
query := `
SELECT client_id, COUNT(*) AS match_count
FROM subscriptions
WHERE channel_id = :channel_id AND subtopic = :subtopic
GROUP BY client_id
`
sb := journal.ClientSubscription{
ChannelID: channelID,
Subtopic: subtopic,
}

rows, err := repo.db.NamedQueryContext(ctx, query, sb)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer rows.Close()

tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

q := `UPDATE clients_telemetry
SET outbound_messages = outbound_messages + $1
WHERE client_id = $2;
`

for rows.Next() {
var clientID string
var count uint64
if err = rows.Scan(&clientID, &count); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

if _, err = repo.db.ExecContext(ctx, q, count, clientID); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return errors.Wrap(errors.ErrRollbackTx, err)
}
}

if err = tx.Commit(); err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix error handling and consider batch updates in IncrementOutboundMessages.

  1. The error handling in line 201 is incorrect - it's returning the rollback error instead of the original error.
  2. Consider using batch updates for better performance when updating multiple clients.

Fix the error handling:

-            return errors.Wrap(errors.ErrRollbackTx, err)
+            return postgres.HandleError(repoerr.ErrUpdateEntity, err)

Consider implementing batch updates:

     q := `UPDATE clients_telemetry
-        SET outbound_messages = outbound_messages + $1
-        WHERE client_id = $2;
+        SET outbound_messages = outbound_messages + v.count
+        FROM (VALUES %s) AS v(client_id, count)
+        WHERE clients_telemetry.client_id = v.client_id;
         `
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
query := `
SELECT client_id, COUNT(*) AS match_count
FROM subscriptions
WHERE channel_id = :channel_id AND subtopic = :subtopic
GROUP BY client_id
`
sb := journal.ClientSubscription{
ChannelID: channelID,
Subtopic: subtopic,
}
rows, err := repo.db.NamedQueryContext(ctx, query, sb)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer rows.Close()
tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
q := `UPDATE clients_telemetry
SET outbound_messages = outbound_messages + $1
WHERE client_id = $2;
`
for rows.Next() {
var clientID string
var count uint64
if err = rows.Scan(&clientID, &count); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if _, err = repo.db.ExecContext(ctx, q, count, clientID); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return errors.Wrap(errors.ErrRollbackTx, err)
}
}
if err = tx.Commit(); err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return nil
}
func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
query := `
SELECT client_id, COUNT(*) AS match_count
FROM subscriptions
WHERE channel_id = :channel_id AND subtopic = :subtopic
GROUP BY client_id
`
sb := journal.ClientSubscription{
ChannelID: channelID,
Subtopic: subtopic,
}
rows, err := repo.db.NamedQueryContext(ctx, query, sb)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer rows.Close()
tx, err := repo.db.BeginTxx(ctx, nil)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
q := `UPDATE clients_telemetry
SET outbound_messages = outbound_messages + $1
WHERE client_id = $2;
`
for rows.Next() {
var clientID string
var count uint64
if err = rows.Scan(&clientID, &count); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if _, err = repo.db.ExecContext(ctx, q, count, clientID); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(errors.ErrRollbackTx, err)
}
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
}
if err = tx.Commit(); err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
return nil
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
mqtt/events/streams.go (2)

33-47: Add validation for instance parameter.

Consider validating the instance parameter to ensure it's not empty.

 func NewEventStoreMiddleware(ctx context.Context, handler session.Handler, url, instance string) (session.Handler, error) {
+    if instance == "" {
+        return nil, errors.New("instance cannot be empty")
+    }
     publisher, err := store.NewPublisher(ctx, url, streamID)

83-112: Add nil check for topics parameter.

Consider adding validation for the topics parameter to prevent potential nil pointer dereference.

 func (es *eventStore) Subscribe(ctx context.Context, topics *[]string) error {
+    if topics == nil {
+        return errors.New("topics cannot be nil")
+    }
     if err := es.handler.Subscribe(ctx, topics); err != nil {
cmd/mqtt/main.go (2)

35-35: Consider clarifying the import alias to avoid confusion with the existing events package.

Currently, there are two distinct packages aliased as msgevents and events. While functionally correct, it may create some confusion. Consider using more descriptive aliases, such as messagingevents or mqttpubevents, to improve clarity.


209-216: Consider providing a fallback or graceful degradation if the event store is unavailable.

Currently, any error initializing the event store middleware halts the entire service. If aggregator telemetry is not strictly mandatory for core service operation, consider an optional fallback mode to maintain partial functionality.

Would you like me to draft a fallback strategy or open a new issue to explore graceful degradation for the event store middleware?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between db903c4 and e300f15.

📒 Files selected for processing (6)
  • .github/workflows/check-generated-files.yml (0 hunks)
  • cmd/mqtt/main.go (4 hunks)
  • mqtt/events/streams.go (1 hunks)
  • mqtt/handler.go (5 hunks)
  • mqtt/handler_test.go (2 hunks)
  • mqtt/mocks/events.go (0 hunks)
💤 Files with no reviewable changes (2)
  • .github/workflows/check-generated-files.yml
  • mqtt/mocks/events.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Run tests
🔇 Additional comments (7)
mqtt/events/streams.go (3)

20-24: LGTM!

The error variables and regular expression pattern are well-defined and descriptive.


28-31: LGTM!

The struct fields are well-named and correctly typed.


48-65: LGTM!

The method has comprehensive error handling and appropriate event publishing.

mqtt/handler.go (2)

54-54: LGTM!

The error variable is well-defined and follows the established error message pattern.


Line range hint 72-78: LGTM!

The handler initialization is correct and the parameter order is logical.

mqtt/handler_test.go (2)

71-72: LGTM!

Moving mock variables to global scope is appropriate for test setup.


530-530: LGTM!

The handler initialization is correct and test coverage is comprehensive.

Comment on lines +138 to +173
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errMalformedTopic
}

chanID := channelParts[1]
subtopic := channelParts[2]

if subtopic == "" {
return subtopic, chanID, nil
}

subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", "", errMalformedTopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")

elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}

if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", "", errMalformedTopic
}

filteredElems = append(filteredElems, elem)
}

subtopic = strings.Join(filteredElems, ".")

return chanID, subtopic, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix return value order in parseTopic function.

The function returns subtopic, chanID on line 148 but chanID, subtopic on line 171. This inconsistency could lead to confusion.

-    return subtopic, chanID, nil
+    return chanID, subtopic, nil
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errMalformedTopic
}
chanID := channelParts[1]
subtopic := channelParts[2]
if subtopic == "" {
return subtopic, chanID, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", "", errMalformedTopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", "", errMalformedTopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return chanID, subtopic, nil
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errMalformedTopic
}
chanID := channelParts[1]
subtopic := channelParts[2]
if subtopic == "" {
return chanID, subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", "", errMalformedTopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", "", errMalformedTopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return chanID, subtopic, nil

Comment on lines +170 to +172
np, err = msgevents.NewPublisherMiddleware(ctx, np, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Revisit closing the final wrapped np publisher.

The same pattern applies here as with mpub: the newly wrapped publisher should be properly closed. Ensure the defer is placed after the publisher is wrapped by msgevents.NewPublisherMiddleware.

Comment on lines +146 to +151
mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure the final wrapped publisher is properly closed upon exit.

After reassigning mpub to the wrapped instance returned by msgevents.NewPublisherMiddleware, the previously deferred mpub.Close() will only close the original publisher object. To avoid potential resource leaks, defer the close of the final wrapped publisher instead:

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
 if err != nil {
   logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
   exitCode = 1
   return
 }
-defer mpub.Close() // This closes the *old* mpub, not the post-middleware mpub
+// Remove the above defer and add it after wrapping
 mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
 if err != nil {
   logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
   exitCode = 1
   return
 }
+defer mpub.Close() // This now closes the *wrapped* mpub

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant